class Krakow::Consumer::Queue
Attributes
@return [Consumer]
@return [Array] order of message removal
@return [Symbol] callback method name
Public Class Methods
Create new consumer queue instance
@param consumer [Consumer] @return [self]
# File lib/krakow/consumer/queue.rb, line 23 def initialize(consumer, *args) opts = args.detect{|x| x.is_a?(Hash)} @consumer = consumer @removal_callback = opts[:removal_callback] @messages = {} @pop_order = [] @cleaner = nil end
Public Instance Methods
Remove connection registration and remove all messages
@param identifier [String] connection identifier @return [Array<FrameType::Message>] messages queued for deregistered connection
# File lib/krakow/consumer/queue.rb, line 59 def deregister_connection(identifier) messages do |collection| removed = collection.delete(identifier) pop_order.delete(identifier) removed end end
Message container
@yieldparam [Hash] messages @return [Hash] messages or block result
# File lib/krakow/consumer/queue.rb, line 36 def messages if(block_given?) yield @messages else @messages end end
Pop first item off the queue
@return [Object]
# File lib/krakow/consumer/queue.rb, line 92 def pop message = nil until(message) wait(:new_message) if pop_order.empty? messages do |collection| key = pop_order.shift if(key) message = collection[key].shift message = validate_message(message) end end end message end
Push new message into queue
@param message [FrameType::Message] @return [self]
# File lib/krakow/consumer/queue.rb, line 71 def push(message) unless(message.is_a?(FrameType::Message)) abort TypeError.new "Expecting `FrameType::Message` but received `#{message.class}`!" end messages do |collection| begin collection[message.connection.identifier] << message pop_order << message.connection.identifier rescue Celluloid::DeadActorError abort Error::ConnectionUnavailable.new end end signal(:new_message) current_actor end
Register a new connection
@param connection [Connection] @return [TrueClass]
# File lib/krakow/consumer/queue.rb, line 48 def register_connection(connection) messages do |collection| collection[connection.identifier] = [] end true end
Remove duplicate message from queue if possible
@param message [FrameType::Message] @return [TrueClass, FalseClass]
# File lib/krakow/consumer/queue.rb, line 119 def scrub_duplicate_message(message) messages do |collection| idx = collection[message.connection.identifier].index do |msg| msg.message_id == message.message_id end if(idx) msg = collection[message.connection.identifier].delete_at(idx) if(removal_callback) consumer.send(removal_callback, [message]) end true else false end end end
@return [Integer] number of queued messages
# File lib/krakow/consumer/queue.rb, line 109 def size messages do |collection| collection.values.map(&:size).inject(&:+) end end
Validate message
# File lib/krakow/consumer/queue.rb, line 137 def validate_message(message) if(message.instance_stamp > message.instance_stamp + (message.connection.endpoint_settings[:msg_timeout] / 1000.0)) warn "Message exceeded timeout! Discarding. (#{message})" if(removal_callback) consumer.send(removal_callback, [message]) end nil else message end end