class Mafia::ConsumerPool

Public Class Methods

new(config=nil) click to toggle source
# File lib/mafia/consumer_pool.rb, line 3
def initialize(config=nil)
  @config = config || Mafia.config

  Mafia.logger.info("Mafia consumer pool starting connection to #{Mafia.config_to_url(@config)}")

  @conn = Bunny.new(@config.slice(:host, :port, :username, :password, :vhost))
  @conn.start

  Mafia.logger.info("Mafia consumer pool started, waiting for rpc events")

  # create a channel and exchange that both client and server know about
  @channel = @conn.create_channel
  @queue  = @channel.queue(@config[:queue])
  @exchange = @channel.default_exchange
end

Public Instance Methods

fetch_consumer(routing_key) click to toggle source
# File lib/mafia/consumer_pool.rb, line 49
def fetch_consumer(routing_key)
  Mafia.consumers.each do |consumer|
    if consumer.routing_key == routing_key
      Mafia.logger.info("Routing key `#{routing_key}` to consumer #{consumer.name}")
      return consumer
    end
  end

  Mafia.logger.info("Routing key `#{routing_key}` to default consumer")
  DefaultConsumer
end
subscribe() click to toggle source
# File lib/mafia/consumer_pool.rb, line 19
def subscribe
  @queue.subscribe(block: true) do |delivery_info, properties, payload|
    req_message = JSON.parse payload

    # calculate a result
    klass = fetch_consumer(req_message['method'])

    begin
      args = (req_message['params'].unshift(req_message['method']))
      result = [:ok, klass.process(*args)]
    rescue Exception => e
      Mafia.logger.error("error while running #{klass} with args: #{req_message['params']}: #{e.message}")
      result = [:error, e.message]
    end

    reply = {
      'id' => req_message['id'],
      'result' => result,
      'jsonrpc' => '2.0'
    }

    # enqueue our reply in the return queue

    Mafia.logger.info("publish to return queue #{properties.reply_to}")
    @exchange.publish(JSON.generate(reply), {
                      routing_key: properties.reply_to,
                      correlation_id: properties.correlation_id})
  end
end