class RabbitCourier::RPCClient
Constants
- DEFAULT_QUEUE_OPTS
Attributes
condition[R]
correlation_id[RW]
lock[R]
response[RW]
Public Class Methods
new(name, opts = {})
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 7 def initialize(name, opts = {}) @name = name @reply_queue = channel.queue(reply_queue_name(name), DEFAULT_QUEUE_OPTS.merge(opts)) @lock = Mutex.new @condition = ConditionVariable.new end
Public Instance Methods
call(message, correlation_id)
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 15 def call(message, correlation_id) self.correlation_id = correlation_id consumer = subscribe_to_reply exchange.publish(message.to_s, routing_key: @name, reply_to: @reply_queue.name, correlation_id: correlation_id) # wait for condition lock.synchronize { condition.wait(lock) } consumer.cancel response end
Private Instance Methods
channel()
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 42 def channel @channel ||= RabbitCourier.connection.create_channel end
exchange()
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 46 def exchange @exchange ||= channel.default_exchange end
reply_queue_name(name)
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 50 def reply_queue_name(name) name + ".reply" end
subscribe_to_reply()
click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 30 def subscribe_to_reply myself = self @reply_queue.subscribe do |delivery_info, properties, payload| if properties[:correlation_id] == myself.correlation_id.to_s myself.response = payload # signal the condition myself.lock.synchronize { myself.condition.signal } end end end