class Shoryuken::Polling::StrictPriority

Public Class Methods

new(queues, delay = nil) click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 4
def initialize(queues, delay = nil)
  # Priority ordering of the queues, highest priority first
  @queues = queues
            .group_by { |q| q }
            .sort_by { |_, qs| -qs.count }
            .map(&:first)

  # Pause status of the queues, default to past time (unpaused)
  @paused_until = queues
                  .each_with_object({}) { |queue, h| h[queue] = Time.at(0) }

  @delay = delay
  # Start queues at 0
  reset_next_queue
end

Public Instance Methods

active_queues() click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 33
def active_queues
  @queues
    .reverse
    .map.with_index(1)
    .reject { |q, _| queue_paused?(q) }
    .reverse
end
message_processed(queue) click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 41
def message_processed(queue)
  logger.debug "Unpausing #{queue}"
  @paused_until[queue] = Time.now
end
messages_found(queue, messages_found) click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 25
def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
  else
    reset_next_queue
  end
end
next_queue() click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 20
def next_queue
  next_queue = next_active_queue
  next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end

Private Instance Methods

next_active_queue() click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 48
def next_active_queue
  reset_next_queue if queues_unpaused_since?

  size = @queues.length
  size.times do
    queue = @queues[@next_queue_index]
    @next_queue_index = (@next_queue_index + 1) % size
    return queue unless queue_paused?(queue)
  end

  nil
end
pause(queue) click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 76
def pause(queue)
  return unless delay > 0

  @paused_until[queue] = Time.now + delay
  logger.debug "Paused #{queue}"
end
queue_paused?(queue) click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 72
def queue_paused?(queue)
  @paused_until[queue] > Time.now
end
queues_unpaused_since?() click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 61
def queues_unpaused_since?
  last = @last_unpause_check
  now = @last_unpause_check = Time.now

  last && @paused_until.values.any? { |t| t > last && t <= now }
end
reset_next_queue() click to toggle source
# File lib/shoryuken/polling/strict_priority.rb, line 68
def reset_next_queue
  @next_queue_index = 0
end