class AMQP::Queue
h2. What are AMQP
queues?
Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. Messages flow from producing applications to {Exchange exchanges} that route them to queues and finally queues deliver them to consumer applications (or consumer applications fetch messages as needed).
Note that unlike some other messaging protocols/systems, messages are not delivered directly to queues. They are delivered to exchanges that route messages to queues using rules knows as bindings.
h2. Concept of bindings
Binding is an association between a queue and an exchange. Queues must be bound to at least one exchange in order to receive messages from publishers. Learn more about bindings in {Exchange Exchange
class documentation}.
h2. Key methods
Key methods of Queue
class are
-
{Queue#bind}
-
{Queue#subscribe}
-
{Queue#pop}
-
{Queue#delete}
-
{Queue#purge}
-
{Queue#unbind}
h2. Queue
names. Server-named queues. Predefined queues.
Every queue has a name that identifies it. Queue
names often contain several segments separated by a dot (.), similarly to how URI path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below). Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument.
Here is an example:
<script src=“gist.github.com/939596.js?file=gistfile1.rb”>>
If you want to declare a queue with a particular name, for example, “images.resize”, pass it to Queue
class constructor:
<script src=“gist.github.com/939600.js?file=gistfile1.rb”>>
Queue
names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this rule will result in AMQP::IncompatibleOptionsError
to be thrown (when queue is re-declared on the same channel object) or channel-level exception (when originally queue was declared on one channel and re-declaration with different attributes happens on another channel). Learn more in {file:docs/Queues.textile Queues guide} and {file:docs/ErrorHandling.textile Error
Handling guide}.
h2. Queue
life-cycles. When use of server-named queues is optimal and when it isn't.
To quote AMQP
0.9.1 spec, there are two common message queue life-cycles:
* Durable message queues that are shared by many consumers and have an independent existence: i.e. they will continue to exist and collect messages whether or not there are consumers to receive them. * Temporary message queues that are private to one consumer and are tied to that consumer. When the consumer disconnects, the message queue is deleted.
There are some variations on these, such as shared message queues that are deleted when the last of many consumers disconnects.
One example of durable message queues is well-known services like event collectors (event loggers). They are usually up whether there are services to log anything or not. Other applications know what queues they use and can rely on those queues being around all the time, survive broker restarts and in general be available should an application in the network need to use them. In this case, explicitly named durable queues are optimal and coupling it creates between applications is not an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated queue names, and so do other applications that use them.
Different scenario is in “a cloud settings” when some kind of workers/instances may come online and go down basically any time and other applications cannot rely on them being available. Using well-known queue names in this case is possible but server-generated, short-lived queues that are bound to topic or fanout exchanges to receive relevant messages is a better idea.
Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes up, development operations may spin up additional applications instances in the cloud to handle the load. Those new instances want to subscribe to receive messages to process but the rest of the system doesn't know anything about them, rely on them being online or try to address them directly: they process events from a shared stream and are not different from their peers. In a case like this, there is no reason for message consumers to not use queue names generated by the broker.
In general, use of explicitly named or server-named queues depends on messaging pattern your application needs. {www.eaipatterns.com/ Enterprise Integration
Patters} discusses many messaging patterns in depth. RabbitMQ FAQ also has a section on {www.rabbitmq.com/faq.html#scenarios use cases}.
h2. Queue
durability and persistence of messages.
Learn more in our {rubyamqp.info/articles/durability/}.
h2. Message ordering
RabbitMQ FAQ explains {www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP
queues}
h2. Error
handling
When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need. Learn more in {file:docs/ErrorHandling.textile Error
Handling guide}.
@note Please make sure you read {rubyamqp.info/articles/durability/} that covers exchanges durability vs. messages
persistence.
@see www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP
0.9.1 specification (Section 2.1.1) @see AMQP::Exchange
Attributes
@return [Hash] Additional arguments given on queue declaration. Typically used by AMQP
extensions.
@return [Array<Hash>]
Channel
this queue belongs to. @return [AMQP::Channel]
@return [Array<Hash>] All consumers on this queue.
@return [AMQP::Consumer] Default consumer (registered with {Queue#consume}).
Name of this queue
Options this queue object was instantiated with
Public Class Methods
@return [Class] @private
# File lib/amqp/queue.rb, line 798 def self.consumer_class AMQP::Consumer end
@option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not
already exist. The client can use this to check whether the queue exists without modifying the server state.
@option opts [Boolean] :durable (false) If set when creating a new queue, the queue will be marked as
durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue (though it is allowed).
@option opts [Boolean] :exclusive (false) Exclusive queues may only be consumed from by the current connection.
Setting the 'exclusive' flag always implies 'auto-delete'. Only a single consumer is allowed to remove messages from this queue. The default is a shared queue. Multiple clients may consume messages from this queue.
@option opts [Boolean] :auto_delete (false) If set, the queue is deleted when all consumers have finished
using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Some brokers implement
AMQP extensions using x-prefixed declaration arguments. For example, RabbitMQ recognizes x-message-ttl declaration arguments that defines TTL of messages in the queue.
@yield [queue, declare_ok] Yields successfully declared queue instance and AMQP
method (queue.declare-ok) instance. The latter is optional. @yieldparam [Queue] queue Queue
that is successfully declared and is ready to be used. @yieldparam [AMQ::Protocol::Queue::DeclareOk] declare_ok AMQP
queue.declare-ok) instance.
@api public
AMQP::Entity::new
# File lib/amqp/queue.rb, line 201 def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @channel = channel @name = name unless name.empty? @server_named = name.empty? @opts = self.class.add_default_options(name, opts, block) raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait] # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. @declaration_deferrable = AMQP::Deferrable.new super(channel.connection) @name = name # this has to stay true even after queue.declare-ok arrives. MK. @server_named = @name.empty? if @server_named self.on_connection_interruption do # server-named queue need to get new names after recovery. MK. @name = AMQ::Protocol::EMPTY_STRING end end @channel = channel # primarily for autorecovery. MK. @bindings = Array.new @consumers = Hash.new shim = Proc.new do |q, declare_ok| case block.arity when 1 then block.call(q) else block.call(q, declare_ok) end end @channel.once_open do if @opts[:nowait] @declaration_deferrable.succeed block.call(self) if block end if block self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) else # we cannot pass :nowait as true here, AMQP::Queue will (rightfully) raise an exception because # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK. self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]) end end end
Protected Class Methods
@private
# File lib/amqp/queue.rb, line 1379 def self.add_default_options(name, opts, block) { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) end
Public Instance Methods
Acknowledge a delivery tag. @return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.8.3.13.)
# File lib/amqp/queue.rb, line 1168 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end
@return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). @api public
# File lib/amqp/queue.rb, line 286 def auto_delete? @auto_delete end
Called by associated connection object when AMQP
connection has been re-established (for example, after a network failure).
@api api
# File lib/amqp/queue.rb, line 1202 def auto_recover self.exec_callback_yielding_self(:before_recovery) if self.server_named? old_name = @name.dup @name = AMQ::Protocol::EMPTY_STRING @channel.queues.delete(old_name) end self.redeclare do self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) end end
@return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.8.3.3.)
# File lib/amqp/queue.rb, line 1083 def basic_consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer nowait = true unless block @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block) @default_consumer.consume(nowait, &block) self end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP
connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/queue.rb, line 1235 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.
A valid exchange name (or reference) must be passed as the first parameter. @example Binding a queue to exchange using AMQP::Exchange
instance
ch = AMQP::Channel.new(connection) exchange = ch.direct('backlog.events') queue = ch.queue('', :exclusive => true) queue.bind(exchange)
@example Binding a queue to exchange using exchange name
ch = AMQP::Channel.new(connection) queue = ch.queue('', :exclusive => true) queue.bind('backlog.events')
Note that if your producer application knows consumer queue name and wants to deliver a message there, direct exchange may be sufficient (in other words, if your code declares an exchange with the same name as a queue and binds it to that queue, consider using the default exchange and routing key on publishing).
@param [Exchange] Exchange
to bind to. May also be a string or any object that responds to name
.
@option opts [String] :routing_key Specifies the routing key for the binding. The routing key is
used for routing messages depending on the exchange configuration. Not all exchanges use a routing key! Refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.
@option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Headers exchange type uses these metadata
attributes for routing matching. In addition, brokers may implement AMQP extensions using x-prefixed declaration arguments.
@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@return [Queue] Self
@yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block.
@api public @see Queue#unbind
# File lib/amqp/queue.rb, line 346 def bind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_bind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end end self end
Compatibility alias for on_declare.
@api public @deprecated
# File lib/amqp/queue.rb, line 888 def callback return nil if !subscribed? @default_consumer.callback end
Unsubscribes from message delivery. @return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.8.3.5.)
# File lib/amqp/queue.rb, line 1098 def cancel(nowait = false, &block) raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil? @default_consumer.cancel(nowait, &block) self end
@return [String] Consumer
tag of the default consumer associated with this queue (if any), or nil @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. @see Queue#subscribe
@see AMQP::Consumer
@api public
# File lib/amqp/queue.rb, line 778 def consumer_tag if @default_consumer @default_consumer.consumer_tag else nil end end
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
@return [NilClass] nil (for v0.7 compatibility)
@option opts [Boolean] :if_unused (false) If set, the server will only delete the queue if it has no
consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.
@option opts [Boolean] :if_empty (false) If set, the server will only delete the queue if it has no
messages. If the queue is not empty the server raises a channel exception.
@option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@return [NilClass] nil (for v0.7 compatibility)
@yield [delete_ok] Yields AMQP
method (queue.delete-ok) instance. @yieldparam [AMQ::Protocol::Queue::DeleteOk] delete_ok AMQP
queue.delete-ok) instance. Carries number of messages that were in the queue.
@api public @see Queue#purge
@see Queue#unbind
# File lib/amqp/queue.rb, line 414 def delete(opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_delete(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) end end # backwards compatibility nil end
@return [Boolean] true if this queue was declared as durable (will survive broker restart). @api public
# File lib/amqp/queue.rb, line 274 def durable? @durable end
@return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) @api public
# File lib/amqp/queue.rb, line 280 def exclusive? @exclusive end
Unique string supposed to be used as a consumer tag.
@return [String] Unique string. @api plugin
# File lib/amqp/queue.rb, line 1275 def generate_consumer_tag(name) "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end
Fetches messages from the queue. @return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.8.3.10.)
# File lib/amqp/queue.rb, line 1123 def get(no_ack = false, &block) @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@channel.id, @name, no_ack)) # most people only want one callback per #get call. Consider the following example: # # 100.times { queue.get { ... } } # # most likely you won't expect 100 callback runs per message here. MK. self.redefine_callback(:get, &block) @channel.queues_awaiting_get_response.push(self) self end
# File lib/amqp/queue.rb, line 1293 def handle_bind_ok(method) self.exec_callback_once(:bind, method) end
@private @api plugin
# File lib/amqp/queue.rb, line 916 def handle_connection_interruption(method = nil) @consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) @declaration_deferrable = EventMachine::DefaultDeferrable.new end
# File lib/amqp/queue.rb, line 924 def handle_declare_ok(method) @name = method.queue if @name.empty? @channel.register_queue(self) self.exec_callback_once_yielding_self(:declare, method) @declaration_deferrable.succeed end
# File lib/amqp/queue.rb, line 1285 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end
# File lib/amqp/queue.rb, line 1306 def handle_get_empty(method) method = AMQ::Protocol::GetResponse.new(method) self.exec_callback(:get, method) end
# File lib/amqp/queue.rb, line 1301 def handle_get_ok(method, header, payload) method = AMQ::Protocol::GetResponse.new(method) self.exec_callback(:get, method, header, payload) end
# File lib/amqp/queue.rb, line 1289 def handle_purge_ok(method) self.exec_callback_once(:purge, method) end
# File lib/amqp/queue.rb, line 1297 def handle_unbind_ok(method) self.exec_callback_once(:unbind, method) end
@api public
# File lib/amqp/queue.rb, line 1107 def on_cancel(&block) @default_consumer.on_cancel(&block) end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/queue.rb, line 1225 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Sections 1.8.3.9)
# File lib/amqp/queue.rb, line 768 def on_delivery(&block) @default_consumer.on_delivery(&block) end
Defines a callback that will be executed when AMQP
connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/queue.rb, line 1251 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
Defines a callback that will be executed once queue is declared. More than one callback can be defined. if queue is already declared, given callback is executed immediately.
@api public
# File lib/amqp/queue.rb, line 263 def once_declared(&block) @declaration_deferrable.callback do # guards against cases when deferred operations # don't complete before the channel is closed block.call if @channel.open? end end
This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.
If queue is empty, `payload` callback argument will be nil, otherwise arguments are identical to those of {AMQP::Queue#subscribe} callback.
@example Fetching messages off AMQP
queue on demand
queue.pop do |metadata, payload| if payload puts "Fetched a message: #{payload.inspect}, content_type: #{metadata.content_type}. Shutting down..." else puts "No messages in the queue" end end
@option opts [Boolean] :ack (false) If this field is set to false the server does not expect acknowledgments
for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.
@return [Qeueue] Self
@yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload. @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key). @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
@api public
# File lib/amqp/queue.rb, line 487 def pop(opts = {}, &block) if block # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. shim = Proc.new { |method, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, method, headers ? headers.decode_payload : nil) block.call(h, payload) else h = Header.new(@channel, method, headers ? headers.decode_payload : nil) block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) end } @channel.once_open do self.once_name_is_available do # see AMQP::Queue#get in amq-client self.get(!opts.fetch(:ack, false), &shim) end end else @channel.once_open do self.once_name_is_available do self.get(!opts.fetch(:ack, false)) end end end end
Don't use this method. It is a leftover from very early days and it ruins the whole point of exchanges/queue separation.
@note This method will be removed before 1.0 release @deprecated @api public
# File lib/amqp/queue.rb, line 903 def publish(data, opts = {}) exchange.publish(data, opts.merge(:routing_key => self.name)) end
This method removes all messages from a queue which are not awaiting acknowledgment.
@option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@return [NilClass] nil (for v0.7 compatibility)
@yield [purge_ok] Yields AMQP
method (queue.purge-ok) instance. @yieldparam [AMQ::Protocol::Queue::PurgeOk] purge_ok AMQP
queue.purge-ok) instance. Carries number of messages that were purged.
@api public @see Queue#delete
@see Queue#unbind
# File lib/amqp/queue.rb, line 441 def purge(opts = {}, &block) @channel.once_open do self.once_declared do queue_purge(opts.fetch(:nowait, false), &block) end end # backwards compatibility nil end
@return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.7.2.3.)
# File lib/amqp/queue.rb, line 1023 def queue_bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) nowait = true unless block exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) if !nowait self.append_callback(:bind, &block) @channel.queues_awaiting_bind_ok.push(self) end # store bindings for automatic recovery, but BE VERY CAREFUL to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments } @bindings.push(binding) unless @bindings.include?(binding) self end
Declares this queue.
@return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.7.2.1.)
# File lib/amqp/queue.rb, line 943 def queue_declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous? # these two are for autorecovery. MK. @passive = passive @server_named = @name.empty? @durable = durable @exclusive = exclusive @auto_delete = auto_delete @arguments = arguments nowait = true if !block && !@name.empty? && nowait.nil? @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end
Deletes this queue.
@param [Boolean] if_unused delete only if queue has no consumers (subscribers). @param [Boolean] if_empty delete only if queue has no messages in it. @param [Boolean] nowait Don't wait for reply from broker. @return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.7.2.9.)
# File lib/amqp/queue.rb, line 1000 def queue_delete(if_unused = false, if_empty = false, nowait = false, &block) nowait = true unless block @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) if !nowait self.append_callback(:delete, &block) # TODO: delete itself from queues cache @channel.queues_awaiting_delete_ok.push(self) end self end
Purges (removes all messagse from) the queue. @return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.7.2.7.)
# File lib/amqp/queue.rb, line 1144 def queue_purge(nowait = false, &block) nowait = true unless block @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) if !nowait self.redefine_callback(:purge, &block) # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_purge_ok.push(self) end self end
@return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.7.2.5.)
# File lib/amqp/queue.rb, line 1052 def queue_unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) self.append_callback(:unbind, &block) @channel.queues_awaiting_unbind_ok.push(self) @bindings.delete_if { |b| b[:exchange] == exchange_name } self end
Used by automatic recovery machinery. @private @api api
# File lib/amqp/queue.rb, line 1194 def rebind(&block) @bindings.each { |b| self.bind(b[:exchange], b) } end
Re-declares queue with the same attributes @api public
# File lib/amqp/queue.rb, line 968 def redeclare(&block) nowait = true if !block && !@name.empty? # server-named queues get their new generated names. new_name = if @server_named AMQ::Protocol::EMPTY_STRING else @name end @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end
@return [Queue] self
@api public @see www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP
0.9.1 protocol reference (Section 1.8.3.14.)
# File lib/amqp/queue.rb, line 1179 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end
Resets queue state. Useful for error handling. @api plugin
# File lib/amqp/queue.rb, line 909 def reset initialize(@channel, @name, @opts) end
@private
# File lib/amqp/queue.rb, line 1257 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) @consumers.each { |tag, c| c.run_after_recovery_callbacks } end
@private
# File lib/amqp/queue.rb, line 1240 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) @consumers.each { |tag, c| c.run_before_recovery_callbacks } end
@return [Boolean] true if this queue is server-named
# File lib/amqp/queue.rb, line 292 def server_named? @server_named end
Get the number of messages and active consumers (with active channel flow) on a queue.
@example Getting number of messages and active consumers for a queue
AMQP::Channel.queue('name').status { |number_of_messages, number_of_active_consumers| puts number_of_messages }
@yield [number_of_messages, number_of_active_consumers] @yieldparam [Fixnum] number_of_messages Number of messages in the queue @yieldparam [Fixnum] number_of_active_consumers Number of active consumers for the queue. Note that consumers can suspend activity (Channel
.Flow) in which case they do not appear in this count.
@api public
# File lib/amqp/queue.rb, line 850 def status(opts = {}, &block) raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) } @channel.once_open do self.once_name_is_available do # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic # recovery process. MK. @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) self.append_callback(:declare, &shim) @channel.queues_awaiting_declare_ok.push(self) end end self end
Subscribes to asynchronous message delivery.
The provided block is passed a single message each time the exchange matches a message to this queue.
Attempts to {Queue#subscribe} multiple times to the same exchange will raise an Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead. {file:docs/Queues.textile Documentation guide on queues} explains this and other topics in great detail.
@example Use of callback with a single argument
EventMachine.run do exchange = AMQP::Channel.direct("foo queue") EM.add_periodic_timer(1) do exchange.publish("random number #{rand(1000)}") end queue = AMQP::Channel.queue('foo queue') queue.subscribe { |body| puts "received payload [#{body}]" } end
If the block takes 2 parameters, both the header and the body will be passed in for processing.
@example Use of callback with two arguments
EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..." channel = AMQP::Channel.new(connection) queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true) exchange = channel.direct("amq.direct") queue.bind(exchange) channel.on_error do |ch, channel_close| puts channel_close.reply_text connection.close { EventMachine.stop } end queue.subscribe do |metadata, payload| puts "metadata.routing_key : #{metadata.routing_key}" puts "metadata.content_type: #{metadata.content_type}" puts "metadata.priority : #{metadata.priority}" puts "metadata.headers : #{metadata.headers.inspect}" puts "metadata.timestamp : #{metadata.timestamp.inspect}" puts "metadata.type : #{metadata.type}" puts "metadata.delivery_tag: #{metadata.delivery_tag}" puts "metadata.redelivered : #{metadata.redelivered}" puts "metadata.app_id : #{metadata.app_id}" puts "metadata.exchange : #{metadata.exchange}" puts puts "Received a message: #{payload}. Disconnecting..." connection.close { EventMachine.stop { exit } } end exchange.publish("Hello, world!", :app_id => "amqpgem.example", :priority => 8, :type => "kinda.checkin", # headers table keys can be anything :headers => { :coordinates => { :latitude => 59.35, :longitude => 18.066667 }, :participants => 11, :venue => "Stockholm" }, :timestamp => Time.now.to_i) end
@example Using object as consumer (message handler), take one
class Consumer # # API # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING) @queue_name = queue_name @channel = channel # Consumer#handle_channel_exception will handle channel # exceptions. Keep in mind that you can only register one error handler, # so the last one registered "wins". @channel.on_error(&method(:handle_channel_exception)) end # initialize def start @queue = @channel.queue(@queue_name, :exclusive => true) # #handle_message method will be handling messages routed to @queue @queue.subscribe(&method(:handle_message)) end # start # # Implementation # def handle_message(metadata, payload) puts "Received a message: #{payload}, content_type = #{metadata.content_type}" end # handle_message(metadata, payload) def handle_channel_exception(channel, channel_close) puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" end # handle_channel_exception(channel, channel_close) end
@example Using object as consumer (message handler), take two: aggregatied handler
class Consumer # # API # def handle_message(metadata, payload) puts "Received a message: #{payload}, content_type = #{metadata.content_type}" end # handle_message(metadata, payload) end class Worker # # API # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new) @queue_name = queue_name @channel = channel @channel.on_error(&method(:handle_channel_exception)) @consumer = consumer end # initialize def start @queue = @channel.queue(@queue_name, :exclusive => true) @queue.subscribe(&@consumer.method(:handle_message)) end # start # # Implementation # def handle_channel_exception(channel, channel_close) puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" end # handle_channel_exception(channel, channel_close) end
@example Unit-testing objects that are used as consumers, RSpec style
require "ostruct" require "json" # RSpec example describe Consumer do describe "when a new message arrives" do subject { described_class.new } let(:metadata) do o = OpenStruct.new o.content_type = "application/json" o end let(:payload) { JSON.encode({ :command => "reload_config" }) } it "does some useful work" do # check preconditions here if necessary subject.handle_message(metadata, payload) # add your code expectations here end end end
@option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments
for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.
@option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@option opts [#call] :confirm (nil) If set, this proc will be called when the server confirms subscription
to the queue with a basic.consume-ok message. Setting this option will automatically set :nowait => false. This is required for the server to send a confirmation.
@option opts [Boolean] :exclusive (false) Request exclusive consumer access, meaning only this consumer can access the queue.
This is useful when you want a long-lived shared queue to be temporarily accessible by just one application (or thread, or process). If application exclusive consumer is part of crashes or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled.
@yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload. @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key). @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
@return [Queue] Self @api public
@see file:docs/Queues.textile Documentation guide on queues @see unsubscribe
@see AMQP::Consumer
# File lib/amqp/queue.rb, line 748 def subscribe(opts = {}, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) @channel.once_open do self.once_name_is_available do # guards against a pathological case race condition when a channel # is opened and closed before delayed operations are completed. self.basic_consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) self.on_delivery(&block) end end self end
Boolean check to see if the current queue has already subscribed to messages delivery (has default consumer).
Attempts to {Queue#subscribe} multiple times to the same exchange will raise an Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead.
@return [Boolean] true if there is a consumer tag associated with this Queue
instance @api public @deprecated
# File lib/amqp/queue.rb, line 879 def subscribed? @default_consumer && @default_consumer.subscribed? end
Remove the binding between the queue and exchange. The queue will not receive any more messages until it is bound to another exchange.
Due to the asynchronous nature of the protocol, it is possible for “in flight” messages to be received after this call completes. Those messages will be serviced by the last block used in a {Queue#subscribe} or {Queue#pop} call.
@param [Exchange] Exchange
to unbind from. @option opts [String] :routing_key Binding routing key @option opts [Hash] :arguments Binding arguments @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@yield [] Since queue.unbind-ok carries no attributes, no parameters are yielded to the block.
@api public @see Queue#bind
# File lib/amqp/queue.rb, line 378 def unbind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_unbind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) end end end
Removes the subscription from the queue and cancels the consumer. Once consumer is cancelled, messages will no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for “in flight” messages to be received after this call completes. Those messages will be serviced by the last block used in a {Queue#subscribe} or {Queue#pop} call.
Fetching messages with {AMQP::Queue#pop} is still possible even after consumer is cancelled.
Additionally, if the queue was created with autodelete set to true, the server will delete the queue after its wait period has expired unless the queue is bound to an active exchange.
The method accepts a block which will be executed when the unsubscription request is acknowledged as complete by the server.
@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
not wait for a reply method, the callback (if passed) will be ignored. If the server could not complete the method it will raise a channel or connection exception.
@yield [cancel_ok] @yieldparam [AMQ::Protocol::Basic::CancelOk] cancel_ok AMQP
method basic.cancel-ok. You can obtain consumer tag from it.
@api public
# File lib/amqp/queue.rb, line 827 def unsubscribe(opts = {}, &block) @channel.once_open do self.once_name_is_available do if @default_consumer @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil end end end end
Protected Instance Methods
# File lib/amqp/queue.rb, line 1383 def once_name_is_available(&block) if server_named? self.once_declared do block.call end else block.call end end
Private Instance Methods
Default direct exchange that we use to publish messages directly to this queue. This is a leftover from very early days and will be removed before version 1.0.
@deprecated
# File lib/amqp/queue.rb, line 1399 def exchange @exchange ||= Exchange.new(@channel, :direct, AMQ::Protocol::EMPTY_STRING, :key => name) end