class MessageQueue::Adapters::Memory::Connection::Consumer

Attributes

block[R]
queue[R]

Public Class Methods

new(*args) click to toggle source
Calls superclass method MessageQueue::Consumer::new
# File lib/message_queue/adapters/memory/consumer.rb, line 4
def initialize(*args)
  super
  @queue = []
end

Public Instance Methods

subscribe(options = {}, &block) click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 9
def subscribe(options = {}, &block)
  producer = options.fetch(:producer)
  producer.add_observer(self)
  @block = block
end
unsubscribe(options = {}) click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 15
def unsubscribe(options = {})
  producer = options.fetch(:producer)
  producer.delete_observer(self)
  @block = nil
end
update(object, options) click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 21
def update(object, options)
  object = load_object(object)
  message = MessageQueue::Message.new(:message_id => options[:message_id],
                                      :type => options[:type],
                                      :timestamp => options[:timestamp],
                                      :routing_key => options[:routing_key],
                                      :payload => load_object(object))
  if block
    block.call(message)
  else
    queue << message
  end
end