class Mafia::ConsumerPool
Public Class Methods
new(config=nil)
click to toggle source
# File lib/mafia/consumer_pool.rb, line 3 def initialize(config=nil) @config = config || Mafia.config Mafia.logger.info("Mafia consumer pool starting connection to #{Mafia.config_to_url(@config)}") @conn = Bunny.new(@config.slice(:host, :port, :username, :password, :vhost)) @conn.start Mafia.logger.info("Mafia consumer pool started, waiting for rpc events") # create a channel and exchange that both client and server know about @channel = @conn.create_channel @queue = @channel.queue(@config[:queue]) @exchange = @channel.default_exchange end
Public Instance Methods
fetch_consumer(routing_key)
click to toggle source
# File lib/mafia/consumer_pool.rb, line 49 def fetch_consumer(routing_key) Mafia.consumers.each do |consumer| if consumer.routing_key == routing_key Mafia.logger.info("Routing key `#{routing_key}` to consumer #{consumer.name}") return consumer end end Mafia.logger.info("Routing key `#{routing_key}` to default consumer") DefaultConsumer end
subscribe()
click to toggle source
# File lib/mafia/consumer_pool.rb, line 19 def subscribe @queue.subscribe(block: true) do |delivery_info, properties, payload| req_message = JSON.parse payload # calculate a result klass = fetch_consumer(req_message['method']) begin args = (req_message['params'].unshift(req_message['method'])) result = [:ok, klass.process(*args)] rescue Exception => e Mafia.logger.error("error while running #{klass} with args: #{req_message['params']}: #{e.message}") result = [:error, e.message] end reply = { 'id' => req_message['id'], 'result' => result, 'jsonrpc' => '2.0' } # enqueue our reply in the return queue Mafia.logger.info("publish to return queue #{properties.reply_to}") @exchange.publish(JSON.generate(reply), { routing_key: properties.reply_to, correlation_id: properties.correlation_id}) end end