class AMQP::Consumer
AMQP
consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP
broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).
@see AMQP::Queue
@see AMQP::Queue#subscribe
@see AMQP::Queue#cancel
Attributes
@return [Hash] Custom subscription metadata
@return [AMQP::Channel] Channel
this consumer uses
@return [String] Consumer
tag, unique consumer identifier
@return [AMQP::Queue] Queue
messages are consumed from
Public Class Methods
# File lib/amqp/consumer.rb, line 50 def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) @callbacks = Hash.new @channel = channel || raise(ArgumentError, "channel is nil") @connection = channel.connection || raise(ArgumentError, "connection is nil") @queue = queue || raise(ArgumentError, "queue is nil") @consumer_tag = consumer_tag || self.class.tag_generator.generate_for(queue) @exclusive = exclusive @no_ack = no_ack @arguments = arguments @no_local = no_local self.register_with_channel self.register_with_queue end
@return [AMQP::ConsumerTagGenerator] Consumer
tag generator
# File lib/amqp/consumer.rb, line 39 def self.tag_generator @tag_generator ||= AMQP::ConsumerTagGenerator.new end
@param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances @return [AMQP::ConsumerTagGenerator] Provided argument
# File lib/amqp/consumer.rb, line 45 def self.tag_generator=(generator) @tag_generator = generator end
Public Instance Methods
Acknowledge a delivery tag. @return [Consumer] self
@api public @see bit.ly/htCzCX AMQP
0.9.1 protocol documentation (Section 1.8.3.13.)
# File lib/amqp/consumer.rb, line 200 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end
Called by associated connection object when AMQP
connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amqp/consumer.rb, line 275 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.resubscribe self.exec_callback_yielding_self(:after_recovery) end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP
connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 255 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
Legacy {AMQP::Queue} API compatibility. @private @deprecated
# File lib/amqp/consumer.rb, line 141 def callback if @callbacks[:delivery] @callbacks[:delivery].first end end
@return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 114 def cancel(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) if !nowait self.redefine_callback(:cancel, &block) @channel.consumers_awaiting_cancel_ok.push(self) end self end end self end
Begin consuming messages from the queue @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 76 def consume(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments)) if !nowait self.redefine_callback(:consume, &block) @channel.consumers_awaiting_consume_ok.push(self) end self end end self end
@return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
# File lib/amqp/consumer.rb, line 68 def exclusive? !!@exclusive end
# File lib/amqp/consumer.rb, line 187 def handle_cancel(basic_cancel) self.exec_callback(:scancel, basic_cancel) end
# File lib/amqp/consumer.rb, line 301 def handle_cancel_ok(cancel_ok) self.exec_callback_once(:cancel, cancel_ok) self.unregister_with_channel self.unregister_with_queue @consumer_tag = nil # detach from object graph so that this object will be garbage-collected @queue = nil @channel = nil @connection = nil self.clear_callbacks(:delivery) self.clear_callbacks(:consume) self.clear_callbacks(:cancel) self.clear_callbacks(:scancel) end
@private
# File lib/amqp/consumer.rb, line 245 def handle_connection_interruption(method = nil) self.exec_callback_yielding_self(:after_connection_interruption) end
# File lib/amqp/consumer.rb, line 297 def handle_consume_ok(consume_ok) self.exec_callback_once(:consume, consume_ok) end
Implementation
# File lib/amqp/consumer.rb, line 293 def handle_delivery(basic_deliver, metadata, payload) self.exec_callback(:delivery, basic_deliver, metadata, payload) end
@return [String] Readable representation of relevant object state.
# File lib/amqp/consumer.rb, line 176 def inspect "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}" end
# File lib/amqp/consumer.rb, line 181 def on_cancel(&block) self.append_callback(:scancel, &block) self end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 239 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Register a block that will be used to handle delivered messages.
@return [AMQP::Consumer] self @see AMQP::Queue#subscribe
# File lib/amqp/consumer.rb, line 152 def on_delivery(&block) # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. delivery_shim = Proc.new { |basic_deliver, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload) else h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key) end } self.append_callback(:delivery, &delivery_shim) self end
Defines a callback that will be executed when AMQP
connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public Defines a callback that will be executed when AMQP
connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 225 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
@return [Consumer] self
@api public @see bit.ly/htCzCX AMQP
0.9.1 protocol documentation (Section 1.8.3.14.)
# File lib/amqp/consumer.rb, line 211 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end
Used by automatic recovery code. @api plugin @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 96 def resubscribe(&block) @channel.once_open do @queue.once_declared do self.unregister_with_channel @consumer_tag = self.class.tag_generator.generate_for(@queue) self.register_with_channel @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments)) self.redefine_callback(:consume, &block) if block self end end self end
@private
# File lib/amqp/consumer.rb, line 265 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) end
@private
# File lib/amqp/consumer.rb, line 260 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) end
{AMQP::Queue} API compatibility.
@return [Boolean] true if this consumer is active (subscribed for message delivery) @api public
# File lib/amqp/consumer.rb, line 134 def subscribed? !@callbacks[:delivery].empty? end
@endgroup
# File lib/amqp/consumer.rb, line 284 def to_s "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>" end
Protected Instance Methods
# File lib/amqp/consumer.rb, line 354 def register_with_channel @channel.consumers[@consumer_tag] = self end
# File lib/amqp/consumer.rb, line 358 def register_with_queue @queue.consumers[@consumer_tag] = self end
# File lib/amqp/consumer.rb, line 362 def unregister_with_channel @channel.consumers.delete(@consumer_tag) end
# File lib/amqp/consumer.rb, line 366 def unregister_with_queue @queue.consumers.delete(@consumer_tag) end