class MessageQueue::Adapters::Bunny::Connection::Consumer
Attributes
exchange_name[R]
exchange_options[R]
exchange_routing_key[R]
queue_name[R]
queue_options[R]
subscribe_options[R]
Public Class Methods
new(connection, options = {})
click to toggle source
Public: Initialize a new Bunny consumer.
connection - The Bunny Connection. options - The Hash options used to initialize the exchange
of a consumer: :queue - :name - The String queue name. :durable - The Boolean queue durability. :exchange - :name - The String exchange name. :routing_key - The String exchange routing key. :subscribe - :ack - The Boolean indicate if it acks. :block - The Boolean indicate if it blocks. Detailed options see https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/queue.rb and https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb.
Returns a Consumer
.
Calls superclass method
MessageQueue::Consumer::new
# File lib/message_queue/adapters/bunny/consumer.rb, line 26 def initialize(connection, options = {}) super @queue_options = self.options.fetch(:queue) @queue_name = queue_options.delete(:name) || (raise "Missing queue name") @exchange_options = self.options.fetch(:exchange) @exchange_name = exchange_options.delete(:name) || (raise "Missing exchange name") @exchange_routing_key = exchange_options.delete(:routing_key) || queue_name @subscribe_options = self.options.fetch(:subscribe, {}).merge(:ack => true) end
Public Instance Methods
ack(delivery_tag)
click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 62 def ack(delivery_tag) channel.ack(delivery_tag, false) end
queue()
click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 58 def queue @queue ||= channel.queue(queue_name, queue_options).bind(exchange_name, :routing_key => exchange_routing_key) end
subscribe(options = {}, &block)
click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 39 def subscribe(options = {}, &block) @subscription = queue.subscribe(subscribe_options.merge(options)) do |delivery_info, metadata, payload| begin message = MessageQueue::Message.new(:message_id => metadata[:message_id], :type => metadata[:type], :timestamp => metadata[:timestamp], :routing_key => delivery_info[:routing_key], :payload => load_object(payload)) block.call(message) ensure ack(delivery_info.delivery_tag) end end end
unsubscribe(options = {})
click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 54 def unsubscribe(options = {}) @subscription.cancel if @subscription end
Private Instance Methods
channel()
click to toggle source
# File lib/message_queue/adapters/bunny/consumer.rb, line 68 def channel @channel ||= connection.connection.create_channel end