class CloudI::API

Constants

ASYNC

unbuffered output is with $stderr.puts '…'

MESSAGE_INIT
MESSAGE_KEEPALIVE
MESSAGE_RECV_ASYNC
MESSAGE_REINIT
MESSAGE_RETURNS_ASYNC
MESSAGE_RETURN_ASYNC
MESSAGE_RETURN_SYNC
MESSAGE_SEND_ASYNC
MESSAGE_SEND_SYNC
MESSAGE_SUBSCRIBE_COUNT
MESSAGE_TERM
NULL
SYNC

Attributes

prefix[R]
priority_default[R]
process_count[R]
process_count_max[R]
process_count_min[R]
process_index[R]
timeout_async[R]
timeout_initialize[R]
timeout_sync[R]
timeout_terminate[R]

Public Class Methods

assert() { || ... } click to toggle source
# File lib/cloudi.rb, line 699
def self.assert
    raise AssertionError unless yield # if $DEBUG
end
info_key_value_new(pairs, response = true) click to toggle source
# File lib/cloudi.rb, line 695
def self.info_key_value_new(pairs, response = true)
    return text_pairs_new(pairs, response)
end
info_key_value_parse(info) click to toggle source
# File lib/cloudi.rb, line 691
def self.info_key_value_parse(info)
    return text_pairs_parse(info)
end
new(thread_index) click to toggle source
# File lib/cloudi.rb, line 44
def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        $stderr.puts 'CloudI service execution must occur in CloudI'
        raise InvalidInputException
    end
    @initialization_complete = false
    @terminate = false
    @size = buffer_size_str.to_i
    @callbacks = Hash.new
    @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN
    send(Erlang.term_to_binary(:init))
    poll_request(nil, false)
end
thread_count() click to toggle source
# File lib/cloudi.rb, line 82
def self.thread_count
    s = getenv('CLOUDI_API_INIT_THREAD_COUNT')
    s.to_i
end

Private Class Methods

getenv(key) click to toggle source
# File lib/cloudi.rb, line 765
def self.getenv(key)
    ENV[key] or raise InvalidInputException
end
text_pairs_new(pairs, response) click to toggle source
# File lib/cloudi.rb, line 672
def self.text_pairs_new(pairs, response)
    text_stream = StringIO.new
    pairs.each do |key, values|
        if values.kind_of?(String)
            text_stream.write "#{key}\0#{values}\0"
        else
            values.each do |value|
                text_stream.write "#{key}\0#{value}\0"
            end
        end
    end
    text = text_stream.string
    if response and text == ''
        return "\0"
    else
        return text
    end
end
text_pairs_parse(text) click to toggle source
# File lib/cloudi.rb, line 656
def self.text_pairs_parse(text)
    pairs = {}
    text_segments = text.split(NULL.chr)
    (0...(text_segments.length)).step(2).each do |i|
        value = pairs[text_segments[i]]
        if value == nil
            pairs[text_segments[i]] = text_segments[i + 1]
        elsif value.kind_of?(Array)
            value << text_segments[i + 1]
        else
            pairs[text_segments[i]] = [value, text_segments[i + 1]]
        end
    end
    return pairs
end

Public Instance Methods

forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 168
def forward_(request_type, name, request_info, request,
             timeout, priority, trans_id, pid)
    case request_type
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, pid)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, pid)
    end
end
forward_async(name, request_info, request, timeout, priority, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 180
def forward_async(name, request_info, request,
                  timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardAsyncException
end
forward_sync(name, request_info, request, timeout, priority, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 190
def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardSyncException
end
mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) click to toggle source
# File lib/cloudi.rb, line 150
def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:mcast_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end
poll(timeout=nil) click to toggle source
# File lib/cloudi.rb, line 642
def poll(timeout=nil)
    if timeout.nil?
        timeout = -1
    end
    return poll_request(timeout, true)
end
recv_async(timeout=nil, trans_id=nil, consume=true) click to toggle source
# File lib/cloudi.rb, line 232
def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(Erlang.term_to_binary([:recv_async, timeout,
                                OtpErlangBinary.new(trans_id),
                                consume]))
    return poll_request(nil, false)
end
return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 200
def return_(request_type, name, pattern, response_info, response,
            timeout, trans_id, pid)
    case request_type
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, pid)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, pid)
    end
end
return_async(name, pattern, response_info, response, timeout, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 212
def return_async(name, pattern, response_info, response,
                 timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_async, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnAsyncException
end
return_sync(name, pattern, response_info, response, timeout, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 222
def return_sync(name, pattern, response_info, response,
                timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_sync, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnSyncException
end
send_async(name, request, timeout=nil, request_info=nil, priority=nil) click to toggle source
# File lib/cloudi.rb, line 114
def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end
send_sync(name, request, timeout=nil, request_info=nil, priority=nil) click to toggle source
# File lib/cloudi.rb, line 132
def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end
shutdown(reason=nil) click to toggle source
# File lib/cloudi.rb, line 649
def shutdown(reason=nil)
    if reason.nil?
        reason = ''
    end
    send(Erlang.term_to_binary([:shutdown, reason]))
end
subscribe(pattern, function) click to toggle source
# File lib/cloudi.rb, line 87
def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(Erlang.term_to_binary([:subscribe, pattern]))
end
subscribe_count(pattern) click to toggle source
# File lib/cloudi.rb, line 98
def subscribe_count(pattern)
    send(Erlang.term_to_binary([:subscribe_count, pattern]))
    return poll_request(nil, false)
end
unsubscribe(pattern) click to toggle source
# File lib/cloudi.rb, line 103
def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(Erlang.term_to_binary([:unsubscribe, pattern]))
end

Private Instance Methods

callback(command, name, pattern, request_info, request, timeout, priority, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 250
def callback(command, name, pattern, request_info, request,
             timeout, priority, trans_id, pid)
    function_queue = @callbacks.fetch(pattern, nil)
    if function_queue.nil?
        function = method(:null_response)
    else
        function = function_queue.shift
        function_queue.push(function)
    end
    return_null_response = false
    case command
    when MESSAGE_SEND_ASYNC
        begin
            response = function.call(ASYNC, name, pattern,
                                     request_info, request,
                                     timeout, priority, trans_id, pid)
            if response.kind_of?(Array)
                API.assert{response.length == 2}
                response_info = response[0]
                response = response[1]
                if not response_info.kind_of?(String)
                    response_info = ''
                end
            else
                response_info = ''
            end
            if not response.kind_of?(String)
                response = ''
            end
        rescue MessageDecodingException => e
            @terminate = true
            return_null_response = true
        rescue TerminateException => e
            return_null_response = true
        rescue ReturnAsyncException
            return
        rescue ReturnSyncException => e
            @terminate = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
            return
        rescue ForwardAsyncException
            return
        rescue ForwardSyncException => e
            @terminate = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
            return
        rescue StandardError => e
            return_null_response = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
        rescue SystemExit => e
            $stderr.puts e.message
            $stderr.puts e.backtrace
            raise
        rescue
            $stderr.puts $!.message
            $stderr.puts $!.backtrace
            exit(1)
        end
        if return_null_response
            response_info = ''
            response = ''
        end
        begin
            return_async(name, pattern, response_info, response,
                         timeout, trans_id, pid)
        rescue ReturnAsyncException
        end
    when MESSAGE_SEND_SYNC
        begin
            response = function.call(SYNC, name, pattern,
                                     request_info, request,
                                     timeout, priority, trans_id, pid)
            if response.kind_of?(Array)
                API.assert{response.length == 2}
                response_info = response[0]
                response = response[1]
                if not response_info.kind_of?(String)
                    response_info = ''
                end
            else
                response_info = ''
            end
            if not response.kind_of?(String)
                response = ''
            end
        rescue MessageDecodingException => e
            @terminate = true
            return_null_response = true
        rescue TerminateException => e
            return_null_response = true
        rescue ReturnSyncException
            return
        rescue ReturnAsyncException => e
            @terminate = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
            return
        rescue ForwardSyncException
            return
        rescue ForwardAsyncException => e
            @terminate = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
            return
        rescue StandardError => e
            return_null_response = true
            $stderr.puts e.message
            $stderr.puts e.backtrace
        rescue SystemExit => e
            $stderr.puts e.message
            $stderr.puts e.backtrace
            raise
        rescue
            $stderr.puts $!.message
            $stderr.puts $!.backtrace
            exit(1)
        end
        if return_null_response
            response_info = ''
            response = ''
        end
        begin
            return_sync(name, pattern, response_info, response,
                        timeout, trans_id, pid)
        rescue ReturnSyncException
        end
    else
        raise MessageDecodingException
    end
end
handle_events(external, data, data_size, i, command=nil) click to toggle source
# File lib/cloudi.rb, line 384
def handle_events(external, data, data_size, i, command=nil)
    if command.nil?
        if i > data_size
            raise MessageDecodingException
        end
        j = 4
        command = data[i, j].unpack('L')[0]
    else
        j = 4
    end
    loop do
        case command
        when MESSAGE_TERM
            @terminate = true
            if external
                return false
            else
                raise TerminateException.new(@timeout_terminate)
            end
        when MESSAGE_REINIT
            i += j; j = 4 + 4 + 4 + 1
            tmp = data[i, j].unpack("LLLc")
            @process_count = tmp[0]
            @timeout_async = tmp[1]
            @timeout_sync = tmp[2]
            @priority_default = tmp[3]
            i += j
        when MESSAGE_KEEPALIVE
            send(Erlang.term_to_binary(:keepalive))
            i += j
        else
            raise MessageDecodingException
        end
        if i > data_size
            raise MessageDecodingException
        elsif i == data_size
            return true
        end
        j = 4
        command = data[i, j].unpack('L')[0]
    end
end
null_response(request_type, name, pattern, request_info, request, timeout, priority, trans_id, pid) click to toggle source
# File lib/cloudi.rb, line 245
def null_response(request_type, name, pattern, request_info, request,
                  timeout, priority, trans_id, pid)
    return ''
end
poll_request(timeout, external) click to toggle source
# File lib/cloudi.rb, line 427
def poll_request(timeout, external)
    if @terminate
        if external
            return false
        else
            raise TerminateException.new(@timeout_terminate)
        end
    elsif external and not @initialization_complete
        send(Erlang.term_to_binary(:polling))
        @initialization_complete = true
    end

    poll_timer = nil
    if timeout.nil? or timeout < 0
        timeout_value = nil
    elsif timeout == 0
        timeout_value = 0
    elsif timeout > 0
        poll_timer = Time.now
        timeout_value = timeout * 0.001
    end
    result = IO.select([@s], nil, [@s], timeout_value)
    if result.nil?
        return true
    end
    if result[2].length > 0
        return false
    end

    data = recv('')
    data_size = data.bytesize
    if data_size == 0
        return false
    end
    i = 0; j = 4

    loop do
        command = data[i, j].unpack('L')[0]
        case command
        when MESSAGE_INIT
            i += j; j = 4 + 4 + 4 + 4 + 4
            tmp = data[i, j].unpack('LLLLL')
            @process_index = tmp[0]
            @process_count = tmp[1]
            @process_count_max = tmp[2]
            @process_count_min = tmp[3]
            prefix_size = tmp[4]
            i += j; j = prefix_size + 4 + 4 + 4 + 4 + 1
            tmp = data[i, j].unpack("Z#{prefix_size}LLLLc")
            @prefix = tmp[0]
            @timeout_initialize = tmp[1]
            @timeout_async = tmp[2]
            @timeout_sync = tmp[3]
            @timeout_terminate = tmp[4]
            @priority_default = tmp[5]
            i += j
            if i != data_size
                API.assert{external == false}
                handle_events(external, data, data_size, i)
            end
            return
        when MESSAGE_SEND_ASYNC, MESSAGE_SEND_SYNC
            i += j; j = 4
            name_size = data[i, j].unpack('L')[0]
            i += j; j = name_size + 4
            tmp = data[i, j].unpack("Z#{name_size}L")
            name = tmp[0]
            pattern_size = tmp[1]
            i += j; j = pattern_size + 4
            tmp = data[i, j].unpack("Z#{pattern_size}L")
            pattern = tmp[0]
            request_info_size = tmp[1]
            i += j; j = request_info_size + 1 + 4
            tmp = data[i, j].unpack("a#{request_info_size}xL")
            request_info = tmp[0]
            request_size = tmp[1]
            i += j; j = request_size + 1 + 4 + 1 + 16 + 4
            tmp = data[i, j].unpack("a#{request_size}xLca16L")
            request = tmp[0]
            request_timeout = tmp[1]
            priority = tmp[2]
            trans_id = tmp[3]
            pid_size = tmp[4]
            i += j; j = pid_size
            pid = data[i, j]
            i += j
            if i != data_size
                API.assert{external == true}
                if not handle_events(external, data, data_size, i)
                    return false
                end
            end
            data.clear()
            callback(command, name, pattern, request_info, request,
                     request_timeout, priority, trans_id,
                     Erlang.binary_to_term(pid))
            if @terminate
                return false
            end
        when MESSAGE_RECV_ASYNC, MESSAGE_RETURN_SYNC
            i += j; j = 4
            response_info_size = data[i, j].unpack('L')[0]
            i += j; j = response_info_size + 1 + 4
            tmp = data[i, j].unpack("a#{response_info_size}xL")
            response_info = tmp[0]
            response_size = tmp[1]
            i += j; j = response_size + 1 + 16
            tmp = data[i, j].unpack("a#{response_size}xa16")
            response = tmp[0]
            trans_id = tmp[1]
            i += j
            if i != data_size
                API.assert{external == false}
                handle_events(external, data, data_size, i)
            end
            return [response_info, response, trans_id]
        when MESSAGE_RETURN_ASYNC
            i += j; j = 16
            trans_id = data[i, j]
            i += j
            if i != data_size
                API.assert{external == false}
                handle_events(external, data, data_size, i)
            end
            return trans_id
        when MESSAGE_RETURNS_ASYNC
            i += j; j = 4
            trans_id_count = data[i, j].unpack('L')[0]
            i += j; j = 16 * trans_id_count
            trans_ids = data[i, j].unpack('a16' * trans_id_count)
            i += j
            if i != data_size
                API.assert{external == false}
                handle_events(external, data, data_size, i)
            end
            return trans_ids
        when MESSAGE_SUBSCRIBE_COUNT
            i += j; j = 4
            count = data[i, j].unpack('L')[0]
            i += j
            if i != data_size
                API.assert{external == false}
                handle_events(external, data, data_size, i)
            end
            return count
        when MESSAGE_TERM
            if not handle_events(external, data, data_size, i, command)
                return false
            end
            API.assert{false}
        when MESSAGE_REINIT
            i += j; j = 4 + 4 + 4 + 1
            tmp = data[i, j].unpack("LLLc")
            @process_count = tmp[0]
            @timeout_async = tmp[1]
            @timeout_sync = tmp[2]
            @priority_default = tmp[3]
            i += j; j = 4
            if i == data_size
                data.clear()
            elsif i < data_size
                next
            else
                raise MessageDecodingException
            end
        when MESSAGE_KEEPALIVE
            send(Erlang.term_to_binary(:keepalive))
            i += j; j = 4
            if i == data_size
                data.clear()
            elsif i < data_size
                next
            else
                raise MessageDecodingException
            end
        else
            raise MessageDecodingException
        end

        if not poll_timer.nil?
            poll_timer_new = Time.now
            elapsed = [0,
                       ((poll_timer_new -
                         poll_timer) * 1000.0).floor].max
            poll_timer = poll_timer_new
            if elapsed >= timeout
                timeout = 0
            else
                timeout -= elapsed
            end
        end
        if not timeout_value.nil?
            if timeout == 0
                return true
            elsif timeout > 0
                timeout_value = timeout * 0.001
            end
        end
        result = IO.select([@s], nil, [@s], timeout_value)
        if result.nil?
            return true
        end
        if result[2].length > 0
            return false
        end

        data = recv(data)
        data_size = data.bytesize
        if data_size == 0
            return false
        end
        i = 0; j = 4
    end
end
recv(data_old) click to toggle source
# File lib/cloudi.rb, line 718
def recv(data_old)
    data = ''
    if @use_header
        i = 0
        while i < 4
            fragment = @s.readpartial(4 - i)
            data += fragment
            i += fragment.length
        end
        total = data.unpack('N')[0]
        data = data_old
        i = 0
        while i < total
            fragment = @s.readpartial([total - i, @size].min)
            data += fragment
            i += fragment.length
        end
    else
        data = data_old
        ready = true
        while ready == true
            fragment = @s.readpartial(@size)
            data += fragment
            ready = (fragment.bytesize == @size)

            if ready
                ready = ! IO.select([@s], nil, nil, 0).nil?
            end
        end
    end
    return data
end
send(data) click to toggle source
# File lib/cloudi.rb, line 711
def send(data)
    if @use_header
        data = [data.length].pack('N') + data
    end
    @s.write(data)
end