module MessageQueue::Consumable
A module to mix in a consumer class, for example:
class Consumer
include MessageQueue::Consumable queue :name => "print_time_now" exchange :name => "time", :routing_key => "time.#" def process(message) ... end
end
The consumer class needs to implement the process method which will be passed a MessageQueue::Message
instance when it receives a message.
Public Class Methods
included(base)
click to toggle source
# File lib/message_queue/consumable.rb, line 22 def self.included(base) base.extend(ClassMethods) MessageQueue.register_consumable(base) end
new()
click to toggle source
# File lib/message_queue/consumable.rb, line 53 def initialize @consumer = MessageQueue.new_consumer(:queue => self.class.queue_options, :exchange => self.class.exchange_options, :subscribe => self.class.subscribe_options) end
Public Instance Methods
subscribe(options = {})
click to toggle source
# File lib/message_queue/consumable.rb, line 59 def subscribe(options = {}) @consumer.subscribe(options) do |message| begin logger.info("Message(#{message.message_id || '-'}): " + "routing key: #{message.routing_key}, " + "type: #{message.type}, " + "timestamp: #{message.timestamp}, " + "consumer: #{@consumer.class}, " + "payload: #{message.payload}") process(message) rescue StandardError => ex handle_error(message, consumer, ex) end end end
Private Instance Methods
handle_error(message, consumer, ex)
click to toggle source
# File lib/message_queue/consumable.rb, line 77 def handle_error(message, consumer, ex) MessageQueue.error_handlers.each do |handler| handler.handle(message, consumer, ex) end end