class Mafia::Dealer

Public Class Methods

new(config=nil) click to toggle source
# File lib/mafia/dealer.rb, line 3
def initialize(config=nil)
  @config = config || Mafia.config

  Mafia.logger.info("Mafia dealer starting connection to #{Mafia.config_to_url(@config)}")
  @conn = Bunny.new(@config.slice(:host, :port, :username, :password, :vhost))
  @conn.start

  Mafia.logger.info("Mafia dealer connection started")

  # create a channel and exchange that both client and server know about
  @channel = @conn.create_channel
  @queue  = @channel.queue(@config[:queue])
  @exchange = @channel.default_exchange
end

Public Instance Methods

publish(channel, *args) click to toggle source
# File lib/mafia/dealer.rb, line 18
def publish(channel, *args)
  start_time = Time.now
  # request id will be used as the name of the return queue
  params = *args
  Mafia.logger.info("Publish to channel `#{channel}` params: #{params}")

  req_message = {
    'id' => SecureRandom.hex,
    'jsonrpc' => '2.0',
    'method' => channel,
    'params' => params
  }

  response = nil

  # create a return queue for this client
  reply_q = @channel.queue('', exclusive: true)

  Mafia.logger.info("subscribe to return queue #{reply_q.name}")
  queue_consumer = reply_q.subscribe do |delivery_info, properties, payload|
    if properties[:correlation_id] == req_message['id']
      response = payload            # visible via closure
      delivery_info.consumer.cancel # unblock the consumer
    end
  end

  # send out our request, serialized as JSON
  @exchange.publish(JSON.generate(req_message), {
              correlation_id: req_message['id'],
              reply_to: reply_q.name,
              routing_key: @queue.name,
            })
  begin
    # wait from response from return queue
    Timeout.timeout(@config[:rpc_timeout]) do
      loop do
        unless response.nil?
          res = JSON.parse(response)
          ret = res["result"]

          Mafia.logger.info("Finished in #{Time.now - start_time}s with response #{ret}")
          queue_consumer.cancel
          return ret
        end
      end
    end
  rescue Timeout::Error
    Mafia.logger.error("Timeout waiting for rpc response")
    raise
  end
end