module Messaging::Consumer
Public Class Methods
Public Instance Methods
Opens connections, channels, and sets up and specified subscriptions invoking {Messaging::Consumer#on_message} when a payload is received.
Evaluation: Eager - This is only required to evaluate and declare the subscriptions which have been deffered using class .subscribe
@return [Messaging::Consumer] @api public
# File lib/messaging/consumer.rb, line 53 def consume unless consumer_channels @consumer_channels ||= consumer_connections.map do |conn| open_channel(conn, config.prefetch) end subscriptions.each { |args| subscribe(*args) } end self end
Invoked when a message is received from any of the subscriptions.
@param meta [AMQP::Header] A wrapper around the AMQP headers, and ruby-amqp metadata @param payload [String] The message payload @raise [NotImplementedError] @api protected
# File lib/messaging/consumer.rb, line 71 def on_message(meta, payload) raise NotImplementedError end
Subscribe to a queue which will invoke the supplied block when a message is received. Additionally declaring a binding to the specified exchange/key pair.
Evaluation: Eager - this will be evaluated when called. Calls to consume
are not required.
@param exchange [String] @param type [String] @param queue [String] @param key [String] @return [Messaging::Consumer] @api public
# File lib/messaging/consumer.rb, line 88 def subscribe(exchange, type, queue, key) consumer_channels.each do |channel| ex = declare_exchange(channel, exchange, type, config.exchange_options) q = declare_queue(channel, ex, queue, key, config.queue_options) # Expliclity create an AMQP::Consumer rather than using # AMQP::Queue.subscription, which is a global singleton # and prevents the creation of multiple subscriptions AMQP::Consumer.new(channel, q).consume.on_delivery do |meta, payload| log.debug("Receieved message on channel #{meta.channel.id} from queue #{queue.inspect}") # If an exception is raised in on_message, the message will not be # acknowledged and the exception will be logged and re-raised begin on_message(meta, payload) meta.ack rescue => ex log.error("Exception: #{ex}, " \ "Payload: #{payload.inspect}, " \ "Headers: #{meta.headers.inspect}\n" \ "Backtrace:\n#{ex.backtrace.join('\n')}") # Re-raise the exception raise ex end end end self end
Private Instance Methods
@return [Array<AMQP::Channel>] @api private
# File lib/messaging/consumer.rb, line 132 def consumer_channels @consumer_channels end
@return [Array<AMQP::Connection>] @api private
# File lib/messaging/consumer.rb, line 124 def consumer_connections @consumer_connections ||= config.consume_from.map do |uri| open_connection(uri) end end
@return [Array<Array(String, String, String, String)>] @api private
# File lib/messaging/consumer.rb, line 138 def subscriptions self.class.subscriptions end