module Messaging::Consumer

Public Class Methods

included(base) click to toggle source

Extends the class into which the Consumer mixin is included with a {Messaging::Consumer.subscribe} method for declaratively specifying Consumer subscriptions.

@api public

# File lib/messaging/consumer.rb, line 41
def self.included(base)
  base.send(:extend, Extensions)
end

Public Instance Methods

consume() click to toggle source

Opens connections, channels, and sets up and specified subscriptions invoking {Messaging::Consumer#on_message} when a payload is received.

Evaluation: Eager - This is only required to evaluate and declare the subscriptions which have been deffered using class .subscribe

@return [Messaging::Consumer] @api public

# File lib/messaging/consumer.rb, line 53
def consume
  unless consumer_channels
    @consumer_channels ||= consumer_connections.map do |conn|
      open_channel(conn, config.prefetch)
    end

    subscriptions.each { |args| subscribe(*args) }
  end

  self
end
on_message(meta, payload) click to toggle source

Invoked when a message is received from any of the subscriptions.

@param meta [AMQP::Header] A wrapper around the AMQP headers, and ruby-amqp metadata @param payload [String] The message payload @raise [NotImplementedError] @api protected

# File lib/messaging/consumer.rb, line 71
def on_message(meta, payload)
  raise NotImplementedError
end
subscribe(exchange, type, queue, key) click to toggle source

Subscribe to a queue which will invoke the supplied block when a message is received. Additionally declaring a binding to the specified exchange/key pair.

Evaluation: Eager - this will be evaluated when called. Calls to consume are not required.

@param exchange [String] @param type [String] @param queue [String] @param key [String] @return [Messaging::Consumer] @api public

# File lib/messaging/consumer.rb, line 88
def subscribe(exchange, type, queue, key)
  consumer_channels.each do |channel|
    ex = declare_exchange(channel, exchange, type, config.exchange_options)
    q  = declare_queue(channel, ex, queue, key, config.queue_options)

    # Expliclity create an AMQP::Consumer rather than using
    # AMQP::Queue.subscription, which is a global singleton
    # and prevents the creation of multiple subscriptions
    AMQP::Consumer.new(channel, q).consume.on_delivery do |meta, payload|
      log.debug("Receieved message on channel #{meta.channel.id} from queue #{queue.inspect}")

      # If an exception is raised in on_message, the message will not be
      # acknowledged and the exception will be logged and re-raised
      begin
        on_message(meta, payload)

        meta.ack
      rescue => ex
        log.error("Exception: #{ex}, " \
          "Payload: #{payload.inspect}, " \
          "Headers: #{meta.headers.inspect}\n" \
          "Backtrace:\n#{ex.backtrace.join('\n')}")

        # Re-raise the exception
        raise ex
      end
    end
  end

  self
end

Private Instance Methods

consumer_channels() click to toggle source

@return [Array<AMQP::Channel>] @api private

# File lib/messaging/consumer.rb, line 132
def consumer_channels
  @consumer_channels
end
consumer_connections() click to toggle source

@return [Array<AMQP::Connection>] @api private

# File lib/messaging/consumer.rb, line 124
def consumer_connections
  @consumer_connections ||= config.consume_from.map do |uri|
    open_connection(uri)
  end
end
subscriptions() click to toggle source

@return [Array<Array(String, String, String, String)>] @api private

# File lib/messaging/consumer.rb, line 138
def subscriptions
  self.class.subscriptions
end