class RabbitRPC::Connection

Constants

PREFETCH_DEFAULT

Attributes

opts[R]
prefetch[R]
queue_name[R]
uri[R]

Public Class Methods

new(queue_name, uri, prefetch, opts = {}) click to toggle source
# File lib/rabbit_rpc/connection.rb, line 12
def initialize(queue_name, uri, prefetch, opts = {})
  @queue_name = queue_name
  @uri        = uri
  @opts       = opts
  @prefetch   = prefetch || PREFETCH_DEFAULT
end

Public Instance Methods

listen!() click to toggle source
# File lib/rabbit_rpc/connection.rb, line 19
def listen!
  EventMachine.run do
    close_connection_on_interrupt
    subscribe_to_queue
  end
end

Private Instance Methods

channel() click to toggle source
# File lib/rabbit_rpc/connection.rb, line 57
def channel
  logger.info 'Establishng connection with channel'
  @channel ||= ::AMQP::Channel.new connect!, prefetch: @prefetch
end
close_connection_on_interrupt() click to toggle source
# File lib/rabbit_rpc/connection.rb, line 69
def close_connection_on_interrupt
  %w[INT TERM].each do |interrupt_type|
    Signal.trap(interrupt_type) do
      logger.info 'Exiting'
      @connection.close do
        EventMachine.stop { exit }
      end
    end
  end
end
connect!() click to toggle source
# File lib/rabbit_rpc/connection.rb, line 50
def connect!
  connection_params = ::AMQP::Client.parse_connection_uri @uri
  connection_params.merge! @opts
  logger.info 'Connecting to RabbitMQ'
  @connection ||= ::AMQP.connect connection_params
end
queue() click to toggle source

Private - Establish connection with a RabbitMQ queue. TODO: Queue options need to be provided

# File lib/rabbit_rpc/connection.rb, line 64
def queue
  logger.info 'Connecting to queue'
  @queue ||= channel.queue @queue_name
end
subscribe_to_queue() click to toggle source
# File lib/rabbit_rpc/connection.rb, line 28
def subscribe_to_queue
  queue.subscribe do |metadata, payload|

    EM.defer do
      request_handler  = RequestHandler.new(payload)
      response_message = request_handler.execute


      unless request_handler.one_way

        channel.default_exchange.publish(
          response_message.to_msgpack,
          routing_key:    metadata.reply_to,
          correlation_id: metadata.message_id,
          mandatory:      true
        )
      end

    end
  end
end