class Shoryuken::Polling::StrictPriority
Public Class Methods
new(queues, delay = nil)
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 4 def initialize(queues, delay = nil) # Priority ordering of the queues, highest priority first @queues = queues .group_by { |q| q } .sort_by { |_, qs| -qs.count } .map(&:first) # Pause status of the queues, default to past time (unpaused) @paused_until = queues .each_with_object({}) { |queue, h| h[queue] = Time.at(0) } @delay = delay # Start queues at 0 reset_next_queue end
Public Instance Methods
active_queues()
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 33 def active_queues @queues .reverse .map.with_index(1) .reject { |q, _| queue_paused?(q) } .reverse end
message_processed(queue)
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 41 def message_processed(queue) if queue_paused?(queue) logger.debug "Unpausing #{queue}" @paused_until[queue] = Time.at 0 end end
messages_found(queue, messages_found)
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 25 def messages_found(queue, messages_found) if messages_found == 0 pause(queue) else reset_next_queue end end
next_queue()
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 20 def next_queue next_queue = next_active_queue next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {}) end
Private Instance Methods
next_active_queue()
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 50 def next_active_queue reset_next_queue if queues_unpaused_since? size = @queues.length size.times do queue = @queues[@next_queue_index] @next_queue_index = (@next_queue_index + 1) % size return queue unless queue_paused?(queue) end nil end
pause(queue)
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 78 def pause(queue) return unless delay > 0 @paused_until[queue] = Time.now + delay logger.debug "Paused #{queue}" end
queue_paused?(queue)
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 74 def queue_paused?(queue) @paused_until[queue] > Time.now end
queues_unpaused_since?()
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 63 def queues_unpaused_since? last = @last_unpause_check now = @last_unpause_check = Time.now last && @paused_until.values.any? { |t| t > last && t <= now } end
reset_next_queue()
click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 70 def reset_next_queue @next_queue_index = 0 end