class Skein::Client::RPC
Constants
- EXCHANGE_NAME_DEFAULT
Constants ============================================================¶ ↑
Public Class Methods
new(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil)
click to toggle source
Instance Methods =====================================================¶ ↑
Calls superclass method
Skein::Connected::new
# File lib/skein/client/rpc.rb, line 18 def initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil) super(connection: connection, context: context, ident: ident) @routing_key = routing_key @timeout = timeout @rpc_exchange = self.channel.direct( exchange_name || EXCHANGE_NAME_DEFAULT, durable: durable ) @response_queue = self.channel.queue( @ident, durable: false, header: true, auto_delete: true ) @expiration = expiration @persistent = !!persistent @callback = { } @consumer = Skein::Adapter.subscribe(@response_queue, block: false) do |payload, delivery_tag, reply_to| self.context.trap do if (ENV['SKEIN_DEBUG_JSON']) $stdout.puts(payload) end response = JSON.load(payload) if (callback = @callback.delete(response['id'])) if (response['error']) exception = case (response['error'] and response['error']['code']) when -32601 NoMethodError.new( "%s from `%s' RPC call" % [ response.dig('error', 'message'), response.dig('error', 'data', 'method') ] ) when -32602 ArgumentError.new( response.dig('error', 'data', 'message') || 'wrong number of arguments' ) else RPCException.new( response.dig('error', 'data', 'message') || response.dig('error', 'message') ) end case (callback) when Skein::TimeoutQueue callback << exception when Proc callback.call(exception) end else case (callback) when Skein::TimeoutQueue callback << response['result'] when Proc callback.call(response['result']) end end end self.channel.acknowledge(delivery_tag) end end end
Public Instance Methods
close()
click to toggle source
Calls superclass method
Skein::Connected#close
# File lib/skein/client/rpc.rb, line 100 def close @consumer&.cancel @consumer = nil super end
method_missing(name, *args, &block)
click to toggle source
# File lib/skein/client/rpc.rb, line 107 def method_missing(name, *args, &block) name = name.to_s blocking = !name.sub!(/!\z/, '') message_id = SecureRandom.uuid request = JSON.dump( jsonrpc: '2.0', method: name, params: args, id: message_id ) @rpc_exchange.publish( request, routing_key: @routing_key, reply_to: blocking ? @ident : nil, content_type: 'application/json', message_id: message_id, persistent: @persistent, expiration: @expiration ) if (block_given?) @callback[message_id] = if (defined?(EventMachine)) EventMachine.next_tick(&block) else block end elsif (blocking) queue = Skein::TimeoutQueue.new(blocking: true, timeout: @timeout) @callback[message_id] = queue case (result = queue.pop) when Exception raise result else result end end end
reroute!(routing_key) { || ... }
click to toggle source
Temporarily deliver RPC
calls to a different routing key. The supplied block is executed with this temporary routing in effect.
# File lib/skein/client/rpc.rb, line 92 def reroute!(routing_key) routing_key, @routing_key = @routing_key, routing_key yield if (block_given?) @routing_key = routing_key end