module MessageQueue::Consumable

A module to mix in a consumer class, for example:

class Consumer

include MessageQueue::Consumable

queue :name => "print_time_now"
exchange :name => "time", :routing_key => "time.#"

def process(message)
  ...
end

end

The consumer class needs to implement the process method which will be passed a MessageQueue::Message instance when it receives a message.

Public Class Methods

included(base) click to toggle source
# File lib/message_queue/consumable.rb, line 22
def self.included(base)
  base.extend(ClassMethods)
  MessageQueue.register_consumable(base)
end
new() click to toggle source
# File lib/message_queue/consumable.rb, line 53
def initialize
  @consumer = MessageQueue.new_consumer(:queue => self.class.queue_options,
                                        :exchange => self.class.exchange_options,
                                        :subscribe => self.class.subscribe_options)
end

Public Instance Methods

subscribe(options = {}) click to toggle source
# File lib/message_queue/consumable.rb, line 59
def subscribe(options = {})
  @consumer.subscribe(options) do |message|
    begin
      logger.info("Message(#{message.message_id || '-'}): " +
                  "routing key: #{message.routing_key}, " +
                  "type: #{message.type}, " +
                  "timestamp: #{message.timestamp}, " +
                  "consumer: #{@consumer.class}, " +
                  "payload: #{message.payload}")
      process(message)
    rescue StandardError => ex
      handle_error(message, consumer, ex)
    end
  end
end

Private Instance Methods

handle_error(message, consumer, ex) click to toggle source
# File lib/message_queue/consumable.rb, line 77
def handle_error(message, consumer, ex)
  MessageQueue.error_handlers.each do |handler|
    handler.handle(message, consumer, ex)
  end
end