class MessageQueue::Adapters::Bunny::Connection::Consumer

Attributes

exchange_name[R]
exchange_options[R]
exchange_routing_key[R]
queue_name[R]
queue_options[R]
subscribe_options[R]

Public Class Methods

new(connection, options = {}) click to toggle source

Public: Initialize a new Bunny consumer.

connection - The Bunny Connection. options - The Hash options used to initialize the exchange

of a consumer:
:queue -
   :name    - The String queue name.
   :durable - The Boolean queue durability.
:exchange -
   :name        - The String exchange name.
   :routing_key - The String exchange routing key.
:subscribe -
   :ack   - The Boolean indicate if it acks.
   :block - The Boolean indicate if it blocks.
Detailed options see
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/queue.rb
and
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb.

Returns a Consumer.

Calls superclass method MessageQueue::Consumer::new
# File lib/message_queue/adapters/bunny/consumer.rb, line 26
def initialize(connection, options = {})
  super

  @queue_options = self.options.fetch(:queue)
  @queue_name = queue_options.delete(:name) || (raise "Missing queue name")

  @exchange_options = self.options.fetch(:exchange)
  @exchange_name = exchange_options.delete(:name) || (raise "Missing exchange name")
  @exchange_routing_key = exchange_options.delete(:routing_key) || queue_name

  @subscribe_options = self.options.fetch(:subscribe, {}).merge(:ack => true)
end

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 62
def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end
queue() click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 58
def queue
  @queue ||= channel.queue(queue_name, queue_options).bind(exchange_name, :routing_key => exchange_routing_key)
end
subscribe(options = {}, &block) click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 39
def subscribe(options = {}, &block)
  @subscription = queue.subscribe(subscribe_options.merge(options)) do |delivery_info, metadata, payload|
    begin
      message = MessageQueue::Message.new(:message_id => metadata[:message_id],
                                          :type => metadata[:type],
                                          :timestamp => metadata[:timestamp],
                                          :routing_key => delivery_info[:routing_key],
                                          :payload => load_object(payload))
      block.call(message)
    ensure
      ack(delivery_info.delivery_tag)
    end
  end
end
unsubscribe(options = {}) click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 54
def unsubscribe(options = {})
  @subscription.cancel if @subscription
end

Private Instance Methods

channel() click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 68
def channel
  @channel ||= connection.connection.create_channel
end