class Mafia::Dealer
Public Class Methods
new(config=nil)
click to toggle source
# File lib/mafia/dealer.rb, line 3 def initialize(config=nil) @config = config || Mafia.config Mafia.logger.info("Mafia dealer starting connection to #{Mafia.config_to_url(@config)}") @conn = Bunny.new(@config.slice(:host, :port, :username, :password, :vhost)) @conn.start Mafia.logger.info("Mafia dealer connection started") # create a channel and exchange that both client and server know about @channel = @conn.create_channel @queue = @channel.queue(@config[:queue]) @exchange = @channel.default_exchange end
Public Instance Methods
publish(channel, *args)
click to toggle source
# File lib/mafia/dealer.rb, line 18 def publish(channel, *args) start_time = Time.now # request id will be used as the name of the return queue params = *args Mafia.logger.info("Publish to channel `#{channel}` params: #{params}") req_message = { 'id' => SecureRandom.hex, 'jsonrpc' => '2.0', 'method' => channel, 'params' => params } response = nil # create a return queue for this client reply_q = @channel.queue('', exclusive: true) Mafia.logger.info("subscribe to return queue #{reply_q.name}") queue_consumer = reply_q.subscribe do |delivery_info, properties, payload| if properties[:correlation_id] == req_message['id'] response = payload # visible via closure delivery_info.consumer.cancel # unblock the consumer end end # send out our request, serialized as JSON @exchange.publish(JSON.generate(req_message), { correlation_id: req_message['id'], reply_to: reply_q.name, routing_key: @queue.name, }) begin # wait from response from return queue Timeout.timeout(@config[:rpc_timeout]) do loop do unless response.nil? res = JSON.parse(response) ret = res["result"] Mafia.logger.info("Finished in #{Time.now - start_time}s with response #{ret}") queue_consumer.cancel return ret end end end rescue Timeout::Error Mafia.logger.error("Timeout waiting for rpc response") raise end end