class MessageQueue::Adapters::Memory::Connection::Consumer
Attributes
block[R]
queue[R]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
MessageQueue::Consumer::new
# File lib/message_queue/adapters/memory/consumer.rb, line 4 def initialize(*args) super @queue = [] end
Public Instance Methods
subscribe(options = {}, &block)
click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 9 def subscribe(options = {}, &block) producer = options.fetch(:producer) producer.add_observer(self) @block = block end
unsubscribe(options = {})
click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 15 def unsubscribe(options = {}) producer = options.fetch(:producer) producer.delete_observer(self) @block = nil end
update(object, options)
click to toggle source
# File lib/message_queue/adapters/memory/consumer.rb, line 21 def update(object, options) object = load_object(object) message = MessageQueue::Message.new(:message_id => options[:message_id], :type => options[:type], :timestamp => options[:timestamp], :routing_key => options[:routing_key], :payload => load_object(object)) if block block.call(message) else queue << message end end