class Mercury::Fake::Queue

Attributes

msgs[R]
source[R]
subscribers[R]
tag_filter[R]
worker[R]

Public Class Methods

new(source, tag_filter, worker, require_ack) click to toggle source
# File lib/mercury/fake/queue.rb, line 9
def initialize(source, tag_filter, worker, require_ack)
  @source = source
  @tag_filter = tag_filter
  @worker = worker
  @require_ack = require_ack
  @msgs = []
  @subscribers = []
end

Public Instance Methods

ack_or_reject_message(msg) click to toggle source
# File lib/mercury/fake/queue.rb, line 28
def ack_or_reject_message(msg)
  msgs.delete(msg) or raise 'tried to delete message that was not in queue!!'
  msg.subscriber.handle_capacity += 1
  deliver # a subscriber just freed up
end
add_subscriber(s) click to toggle source
# File lib/mercury/fake/queue.rb, line 18
def add_subscriber(s)
  subscribers << s
  deliver # new subscriber probably wants a message
end
binds?(source_name, tag) click to toggle source
# File lib/mercury/fake/queue.rb, line 40
def binds?(source_name, tag)
  source_name == source && tag_match?(tag_filter, tag)
end
enqueue(msg, tag, headers) click to toggle source
# File lib/mercury/fake/queue.rb, line 23
def enqueue(msg, tag, headers)
  msgs.push(QueuedMessage.new(self, msg, tag, headers, @require_ack))
  deliver # new message. someone probably wants it.
end
nack(msg) click to toggle source
# File lib/mercury/fake/queue.rb, line 34
def nack(msg)
  msg.delivered = false
  msg.subscriber.handle_capacity += 1
  deliver
end

Private Instance Methods

deliver() click to toggle source
# File lib/mercury/fake/queue.rb, line 53
def deliver
  EM.next_tick do
    if idle_subscribers.any? && undelivered.any?
      msg = undelivered.first
      subscriber = idle_subscribers.sample
      if @require_ack
        msg.delivered = true
        subscriber.handle_capacity -= 1
      else
        msgs.delete(msg)
      end
      msg.subscriber = subscriber
      subscriber.handler.call(msg.received_msg)
      deliver # continue delivering
    end
  end
end
idle_subscribers() click to toggle source
# File lib/mercury/fake/queue.rb, line 75
def idle_subscribers
  subscribers.reject { |s| s.handle_capacity == 0 }
end
tag_match?(filter, tag) click to toggle source
# File lib/mercury/fake/queue.rb, line 47
def tag_match?(filter, tag)
  # for wildcard description, see https://www.rabbitmq.com/tutorials/tutorial-five-python.html
  pattern = Regexp.new(filter.gsub('*', '[^\.]+').gsub('#', '.*?'))
  pattern.match(tag)
end
undelivered() click to toggle source
# File lib/mercury/fake/queue.rb, line 71
def undelivered
  msgs.reject(&:delivered)
end