class Krakow::Consumer::Queue

Attributes

consumer[R]

@return [Consumer]

pop_order[R]

@return [Array] order of message removal

removal_callback[R]

@return [Symbol] callback method name

Public Class Methods

new(consumer, *args) click to toggle source

Create new consumer queue instance

@param consumer [Consumer] @return [self]

# File lib/krakow/consumer/queue.rb, line 23
def initialize(consumer, *args)
  opts = args.detect{|x| x.is_a?(Hash)}
  @consumer = consumer
  @removal_callback = opts[:removal_callback]
  @messages = {}
  @pop_order = []
  @cleaner = nil
end

Public Instance Methods

<<(message)
Alias for: push
deq()
Alias for: pop
deregister_connection(identifier) click to toggle source

Remove connection registration and remove all messages

@param identifier [String] connection identifier @return [Array<FrameType::Message>] messages queued for deregistered connection

# File lib/krakow/consumer/queue.rb, line 59
def deregister_connection(identifier)
  messages do |collection|
    removed = collection.delete(identifier)
    pop_order.delete(identifier)
    removed
  end
end
enq(message)
Alias for: push
messages() { |messages| ... } click to toggle source

Message container

@yieldparam [Hash] messages @return [Hash] messages or block result

# File lib/krakow/consumer/queue.rb, line 36
def messages
  if(block_given?)
    yield @messages
  else
    @messages
  end
end
pop() click to toggle source

Pop first item off the queue

@return [Object]

# File lib/krakow/consumer/queue.rb, line 92
def pop
  message = nil
  until(message)
    wait(:new_message) if pop_order.empty?
    messages do |collection|
      key = pop_order.shift
      if(key)
        message = collection[key].shift
        message = validate_message(message)
      end
    end
  end
  message
end
Also aliased as: deq
push(message) click to toggle source

Push new message into queue

@param message [FrameType::Message] @return [self]

# File lib/krakow/consumer/queue.rb, line 71
def push(message)
  unless(message.is_a?(FrameType::Message))
    abort TypeError.new "Expecting `FrameType::Message` but received `#{message.class}`!"
  end
  messages do |collection|
    begin
      collection[message.connection.identifier] << message
      pop_order << message.connection.identifier
    rescue Celluloid::DeadActorError
      abort Error::ConnectionUnavailable.new
    end
  end
  signal(:new_message)
  current_actor
end
Also aliased as: <<, enq
register_connection(connection) click to toggle source

Register a new connection

@param connection [Connection] @return [TrueClass]

# File lib/krakow/consumer/queue.rb, line 48
def register_connection(connection)
  messages do |collection|
    collection[connection.identifier] = []
  end
  true
end
scrub_duplicate_message(message) click to toggle source

Remove duplicate message from queue if possible

@param message [FrameType::Message] @return [TrueClass, FalseClass]

# File lib/krakow/consumer/queue.rb, line 119
def scrub_duplicate_message(message)
  messages do |collection|
    idx = collection[message.connection.identifier].index do |msg|
      msg.message_id == message.message_id
    end
    if(idx)
      msg = collection[message.connection.identifier].delete_at(idx)
      if(removal_callback)
        consumer.send(removal_callback, [message])
      end
      true
    else
      false
    end
  end
end
size() click to toggle source

@return [Integer] number of queued messages

# File lib/krakow/consumer/queue.rb, line 109
def size
  messages do |collection|
    collection.values.map(&:size).inject(&:+)
  end
end
validate_message(message) click to toggle source

Validate message

# File lib/krakow/consumer/queue.rb, line 137
def validate_message(message)
  if(message.instance_stamp > message.instance_stamp + (message.connection.endpoint_settings[:msg_timeout] / 1000.0))
    warn "Message exceeded timeout! Discarding. (#{message})"
    if(removal_callback)
      consumer.send(removal_callback, [message])
    end
    nil
  else
    message
  end
end