class RabbitCourier::RPCClient

Constants

DEFAULT_QUEUE_OPTS

Attributes

condition[R]
correlation_id[RW]
lock[R]
response[RW]

Public Class Methods

new(name, opts = {}) click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 7
def initialize(name, opts = {})
  @name = name
  @reply_queue = channel.queue(reply_queue_name(name), DEFAULT_QUEUE_OPTS.merge(opts))

  @lock = Mutex.new
  @condition = ConditionVariable.new
end

Public Instance Methods

call(message, correlation_id) click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 15
def call(message, correlation_id)
  self.correlation_id = correlation_id

  consumer = subscribe_to_reply
  exchange.publish(message.to_s, routing_key: @name, reply_to: @reply_queue.name, correlation_id: correlation_id)

  # wait for condition
  lock.synchronize { condition.wait(lock) }
  consumer.cancel

  response
end

Private Instance Methods

channel() click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 42
def channel
  @channel ||= RabbitCourier.connection.create_channel
end
exchange() click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 46
def exchange
  @exchange ||= channel.default_exchange
end
reply_queue_name(name) click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 50
def reply_queue_name(name)
  name + ".reply"
end
subscribe_to_reply() click to toggle source
# File lib/rabbit_courier/rpc_client.rb, line 30
def subscribe_to_reply
  myself = self
  @reply_queue.subscribe do |delivery_info, properties, payload|
    if properties[:correlation_id] == myself.correlation_id.to_s
      myself.response = payload

      # signal the condition
      myself.lock.synchronize { myself.condition.signal }
    end
  end
end