class Mercury::Fake::Queue
Attributes
msgs[R]
source[R]
subscribers[R]
tag_filter[R]
worker[R]
Public Class Methods
new(source, tag_filter, worker, require_ack)
click to toggle source
# File lib/mercury/fake/queue.rb, line 9 def initialize(source, tag_filter, worker, require_ack) @source = source @tag_filter = tag_filter @worker = worker @require_ack = require_ack @msgs = [] @subscribers = [] end
Public Instance Methods
ack_or_reject_message(msg)
click to toggle source
# File lib/mercury/fake/queue.rb, line 28 def ack_or_reject_message(msg) msgs.delete(msg) or raise 'tried to delete message that was not in queue!!' msg.subscriber.handle_capacity += 1 deliver # a subscriber just freed up end
add_subscriber(s)
click to toggle source
# File lib/mercury/fake/queue.rb, line 18 def add_subscriber(s) subscribers << s deliver # new subscriber probably wants a message end
binds?(source_name, tag)
click to toggle source
# File lib/mercury/fake/queue.rb, line 40 def binds?(source_name, tag) source_name == source && tag_match?(tag_filter, tag) end
enqueue(msg, tag, headers)
click to toggle source
# File lib/mercury/fake/queue.rb, line 23 def enqueue(msg, tag, headers) msgs.push(QueuedMessage.new(self, msg, tag, headers, @require_ack)) deliver # new message. someone probably wants it. end
nack(msg)
click to toggle source
# File lib/mercury/fake/queue.rb, line 34 def nack(msg) msg.delivered = false msg.subscriber.handle_capacity += 1 deliver end
Private Instance Methods
deliver()
click to toggle source
# File lib/mercury/fake/queue.rb, line 53 def deliver EM.next_tick do if idle_subscribers.any? && undelivered.any? msg = undelivered.first subscriber = idle_subscribers.sample if @require_ack msg.delivered = true subscriber.handle_capacity -= 1 else msgs.delete(msg) end msg.subscriber = subscriber subscriber.handler.call(msg.received_msg) deliver # continue delivering end end end
idle_subscribers()
click to toggle source
# File lib/mercury/fake/queue.rb, line 75 def idle_subscribers subscribers.reject { |s| s.handle_capacity == 0 } end
tag_match?(filter, tag)
click to toggle source
# File lib/mercury/fake/queue.rb, line 47 def tag_match?(filter, tag) # for wildcard description, see https://www.rabbitmq.com/tutorials/tutorial-five-python.html pattern = Regexp.new(filter.gsub('*', '[^\.]+').gsub('#', '.*?')) pattern.match(tag) end
undelivered()
click to toggle source
# File lib/mercury/fake/queue.rb, line 71 def undelivered msgs.reject(&:delivered) end