class CloudI::API
Constants
- ASYNC
unbuffered output is with $stderr.puts '…'
- MESSAGE_INIT
- MESSAGE_KEEPALIVE
- MESSAGE_RECV_ASYNC
- MESSAGE_REINIT
- MESSAGE_RETURNS_ASYNC
- MESSAGE_RETURN_ASYNC
- MESSAGE_RETURN_SYNC
- MESSAGE_SEND_ASYNC
- MESSAGE_SEND_SYNC
- MESSAGE_SUBSCRIBE_COUNT
- MESSAGE_TERM
- NULL
- SYNC
Attributes
prefix[R]
priority_default[R]
process_count[R]
process_count_max[R]
process_count_min[R]
process_index[R]
timeout_async[R]
timeout_initialize[R]
timeout_sync[R]
timeout_terminate[R]
Public Class Methods
assert() { || ... }
click to toggle source
# File lib/cloudi.rb, line 699 def self.assert raise AssertionError unless yield # if $DEBUG end
info_key_value_new(pairs, response = true)
click to toggle source
# File lib/cloudi.rb, line 695 def self.info_key_value_new(pairs, response = true) return text_pairs_new(pairs, response) end
info_key_value_parse(info)
click to toggle source
# File lib/cloudi.rb, line 691 def self.info_key_value_parse(info) return text_pairs_parse(info) end
new(thread_index)
click to toggle source
# File lib/cloudi.rb, line 44 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else $stderr.puts 'CloudI service execution must occur in CloudI' raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size_str.to_i @callbacks = Hash.new @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end
thread_count()
click to toggle source
# File lib/cloudi.rb, line 82 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end
Private Class Methods
getenv(key)
click to toggle source
# File lib/cloudi.rb, line 765 def self.getenv(key) ENV[key] or raise InvalidInputException end
text_pairs_new(pairs, response)
click to toggle source
# File lib/cloudi.rb, line 672 def self.text_pairs_new(pairs, response) text_stream = StringIO.new pairs.each do |key, values| if values.kind_of?(String) text_stream.write "#{key}\0#{values}\0" else values.each do |value| text_stream.write "#{key}\0#{value}\0" end end end text = text_stream.string if response and text == '' return "\0" else return text end end
text_pairs_parse(text)
click to toggle source
# File lib/cloudi.rb, line 656 def self.text_pairs_parse(text) pairs = {} text_segments = text.split(NULL.chr) (0...(text_segments.length)).step(2).each do |i| value = pairs[text_segments[i]] if value == nil pairs[text_segments[i]] = text_segments[i + 1] elsif value.kind_of?(Array) value << text_segments[i + 1] else pairs[text_segments[i]] = [value, text_segments[i + 1]] end end return pairs end
Public Instance Methods
forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 168 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end
forward_async(name, request_info, request, timeout, priority, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 180 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException end
forward_sync(name, request_info, request, timeout, priority, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 190 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException end
mcast_async(name, request, timeout=nil, request_info=nil, priority=nil)
click to toggle source
# File lib/cloudi.rb, line 150 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end
poll(timeout=nil)
click to toggle source
# File lib/cloudi.rb, line 642 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end
recv_async(timeout=nil, trans_id=nil, consume=true)
click to toggle source
# File lib/cloudi.rb, line 232 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end
return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 200 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end
return_async(name, pattern, response_info, response, timeout, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 212 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException end
return_sync(name, pattern, response_info, response, timeout, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 222 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException end
send_async(name, request, timeout=nil, request_info=nil, priority=nil)
click to toggle source
# File lib/cloudi.rb, line 114 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end
send_sync(name, request, timeout=nil, request_info=nil, priority=nil)
click to toggle source
# File lib/cloudi.rb, line 132 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end
shutdown(reason=nil)
click to toggle source
# File lib/cloudi.rb, line 649 def shutdown(reason=nil) if reason.nil? reason = '' end send(Erlang.term_to_binary([:shutdown, reason])) end
subscribe(pattern, function)
click to toggle source
# File lib/cloudi.rb, line 87 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end
subscribe_count(pattern)
click to toggle source
# File lib/cloudi.rb, line 98 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end
unsubscribe(pattern)
click to toggle source
# File lib/cloudi.rb, line 103 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end
Private Instance Methods
callback(command, name, pattern, request_info, request, timeout, priority, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 250 def callback(command, name, pattern, request_info, request, timeout, priority, trans_id, pid) function_queue = @callbacks.fetch(pattern, nil) if function_queue.nil? function = method(:null_response) else function = function_queue.shift function_queue.push(function) end return_null_response = false case command when MESSAGE_SEND_ASYNC begin response = function.call(ASYNC, name, pattern, request_info, request, timeout, priority, trans_id, pid) if response.kind_of?(Array) API.assert{response.length == 2} response_info = response[0] response = response[1] if not response_info.kind_of?(String) response_info = '' end else response_info = '' end if not response.kind_of?(String) response = '' end rescue MessageDecodingException => e @terminate = true return_null_response = true rescue TerminateException => e return_null_response = true rescue ReturnAsyncException return rescue ReturnSyncException => e @terminate = true $stderr.puts e.message $stderr.puts e.backtrace return rescue ForwardAsyncException return rescue ForwardSyncException => e @terminate = true $stderr.puts e.message $stderr.puts e.backtrace return rescue StandardError => e return_null_response = true $stderr.puts e.message $stderr.puts e.backtrace rescue SystemExit => e $stderr.puts e.message $stderr.puts e.backtrace raise rescue $stderr.puts $!.message $stderr.puts $!.backtrace exit(1) end if return_null_response response_info = '' response = '' end begin return_async(name, pattern, response_info, response, timeout, trans_id, pid) rescue ReturnAsyncException end when MESSAGE_SEND_SYNC begin response = function.call(SYNC, name, pattern, request_info, request, timeout, priority, trans_id, pid) if response.kind_of?(Array) API.assert{response.length == 2} response_info = response[0] response = response[1] if not response_info.kind_of?(String) response_info = '' end else response_info = '' end if not response.kind_of?(String) response = '' end rescue MessageDecodingException => e @terminate = true return_null_response = true rescue TerminateException => e return_null_response = true rescue ReturnSyncException return rescue ReturnAsyncException => e @terminate = true $stderr.puts e.message $stderr.puts e.backtrace return rescue ForwardSyncException return rescue ForwardAsyncException => e @terminate = true $stderr.puts e.message $stderr.puts e.backtrace return rescue StandardError => e return_null_response = true $stderr.puts e.message $stderr.puts e.backtrace rescue SystemExit => e $stderr.puts e.message $stderr.puts e.backtrace raise rescue $stderr.puts $!.message $stderr.puts $!.backtrace exit(1) end if return_null_response response_info = '' response = '' end begin return_sync(name, pattern, response_info, response, timeout, trans_id, pid) rescue ReturnSyncException end else raise MessageDecodingException end end
handle_events(external, data, data_size, i, command=nil)
click to toggle source
# File lib/cloudi.rb, line 384 def handle_events(external, data, data_size, i, command=nil) if command.nil? if i > data_size raise MessageDecodingException end j = 4 command = data[i, j].unpack('L')[0] else j = 4 end loop do case command when MESSAGE_TERM @terminate = true if external return false else raise TerminateException.new(@timeout_terminate) end when MESSAGE_REINIT i += j; j = 4 + 4 + 4 + 1 tmp = data[i, j].unpack("LLLc") @process_count = tmp[0] @timeout_async = tmp[1] @timeout_sync = tmp[2] @priority_default = tmp[3] i += j when MESSAGE_KEEPALIVE send(Erlang.term_to_binary(:keepalive)) i += j else raise MessageDecodingException end if i > data_size raise MessageDecodingException elsif i == data_size return true end j = 4 command = data[i, j].unpack('L')[0] end end
null_response(request_type, name, pattern, request_info, request, timeout, priority, trans_id, pid)
click to toggle source
# File lib/cloudi.rb, line 245 def null_response(request_type, name, pattern, request_info, request, timeout, priority, trans_id, pid) return '' end
poll_request(timeout, external)
click to toggle source
# File lib/cloudi.rb, line 427 def poll_request(timeout, external) if @terminate if external return false else raise TerminateException.new(@timeout_terminate) end elsif external and not @initialization_complete send(Erlang.term_to_binary(:polling)) @initialization_complete = true end poll_timer = nil if timeout.nil? or timeout < 0 timeout_value = nil elsif timeout == 0 timeout_value = 0 elsif timeout > 0 poll_timer = Time.now timeout_value = timeout * 0.001 end result = IO.select([@s], nil, [@s], timeout_value) if result.nil? return true end if result[2].length > 0 return false end data = recv('') data_size = data.bytesize if data_size == 0 return false end i = 0; j = 4 loop do command = data[i, j].unpack('L')[0] case command when MESSAGE_INIT i += j; j = 4 + 4 + 4 + 4 + 4 tmp = data[i, j].unpack('LLLLL') @process_index = tmp[0] @process_count = tmp[1] @process_count_max = tmp[2] @process_count_min = tmp[3] prefix_size = tmp[4] i += j; j = prefix_size + 4 + 4 + 4 + 4 + 1 tmp = data[i, j].unpack("Z#{prefix_size}LLLLc") @prefix = tmp[0] @timeout_initialize = tmp[1] @timeout_async = tmp[2] @timeout_sync = tmp[3] @timeout_terminate = tmp[4] @priority_default = tmp[5] i += j if i != data_size API.assert{external == false} handle_events(external, data, data_size, i) end return when MESSAGE_SEND_ASYNC, MESSAGE_SEND_SYNC i += j; j = 4 name_size = data[i, j].unpack('L')[0] i += j; j = name_size + 4 tmp = data[i, j].unpack("Z#{name_size}L") name = tmp[0] pattern_size = tmp[1] i += j; j = pattern_size + 4 tmp = data[i, j].unpack("Z#{pattern_size}L") pattern = tmp[0] request_info_size = tmp[1] i += j; j = request_info_size + 1 + 4 tmp = data[i, j].unpack("a#{request_info_size}xL") request_info = tmp[0] request_size = tmp[1] i += j; j = request_size + 1 + 4 + 1 + 16 + 4 tmp = data[i, j].unpack("a#{request_size}xLca16L") request = tmp[0] request_timeout = tmp[1] priority = tmp[2] trans_id = tmp[3] pid_size = tmp[4] i += j; j = pid_size pid = data[i, j] i += j if i != data_size API.assert{external == true} if not handle_events(external, data, data_size, i) return false end end data.clear() callback(command, name, pattern, request_info, request, request_timeout, priority, trans_id, Erlang.binary_to_term(pid)) if @terminate return false end when MESSAGE_RECV_ASYNC, MESSAGE_RETURN_SYNC i += j; j = 4 response_info_size = data[i, j].unpack('L')[0] i += j; j = response_info_size + 1 + 4 tmp = data[i, j].unpack("a#{response_info_size}xL") response_info = tmp[0] response_size = tmp[1] i += j; j = response_size + 1 + 16 tmp = data[i, j].unpack("a#{response_size}xa16") response = tmp[0] trans_id = tmp[1] i += j if i != data_size API.assert{external == false} handle_events(external, data, data_size, i) end return [response_info, response, trans_id] when MESSAGE_RETURN_ASYNC i += j; j = 16 trans_id = data[i, j] i += j if i != data_size API.assert{external == false} handle_events(external, data, data_size, i) end return trans_id when MESSAGE_RETURNS_ASYNC i += j; j = 4 trans_id_count = data[i, j].unpack('L')[0] i += j; j = 16 * trans_id_count trans_ids = data[i, j].unpack('a16' * trans_id_count) i += j if i != data_size API.assert{external == false} handle_events(external, data, data_size, i) end return trans_ids when MESSAGE_SUBSCRIBE_COUNT i += j; j = 4 count = data[i, j].unpack('L')[0] i += j if i != data_size API.assert{external == false} handle_events(external, data, data_size, i) end return count when MESSAGE_TERM if not handle_events(external, data, data_size, i, command) return false end API.assert{false} when MESSAGE_REINIT i += j; j = 4 + 4 + 4 + 1 tmp = data[i, j].unpack("LLLc") @process_count = tmp[0] @timeout_async = tmp[1] @timeout_sync = tmp[2] @priority_default = tmp[3] i += j; j = 4 if i == data_size data.clear() elsif i < data_size next else raise MessageDecodingException end when MESSAGE_KEEPALIVE send(Erlang.term_to_binary(:keepalive)) i += j; j = 4 if i == data_size data.clear() elsif i < data_size next else raise MessageDecodingException end else raise MessageDecodingException end if not poll_timer.nil? poll_timer_new = Time.now elapsed = [0, ((poll_timer_new - poll_timer) * 1000.0).floor].max poll_timer = poll_timer_new if elapsed >= timeout timeout = 0 else timeout -= elapsed end end if not timeout_value.nil? if timeout == 0 return true elsif timeout > 0 timeout_value = timeout * 0.001 end end result = IO.select([@s], nil, [@s], timeout_value) if result.nil? return true end if result[2].length > 0 return false end data = recv(data) data_size = data.bytesize if data_size == 0 return false end i = 0; j = 4 end end
recv(data_old)
click to toggle source
# File lib/cloudi.rb, line 718 def recv(data_old) data = '' if @use_header i = 0 while i < 4 fragment = @s.readpartial(4 - i) data += fragment i += fragment.length end total = data.unpack('N')[0] data = data_old i = 0 while i < total fragment = @s.readpartial([total - i, @size].min) data += fragment i += fragment.length end else data = data_old ready = true while ready == true fragment = @s.readpartial(@size) data += fragment ready = (fragment.bytesize == @size) if ready ready = ! IO.select([@s], nil, nil, 0).nil? end end end return data end
send(data)
click to toggle source
# File lib/cloudi.rb, line 711 def send(data) if @use_header data = [data.length].pack('N') + data end @s.write(data) end