class Shoryuken::Polling::WeightedRoundRobin

Public Class Methods

new(queues, delay = nil) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 4
def initialize(queues, delay = nil)
  @initial_queues = queues
  @queues = queues.dup.uniq
  @paused_queues = []
  @delay = delay
end

Public Instance Methods

active_queues() click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 34
def active_queues
  unparse_queues(@queues)
end
message_processed(queue) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 38
def message_processed(queue)
  return if @paused_queues.empty?

  logger.debug "Unpausing #{queue}"
  @paused_queues.reject! { |_time, name| name == queue }
  @queues << queue
  @queues.uniq!
end
messages_found(queue, messages_found) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 20
def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
    return
  end

  maximum_weight = maximum_queue_weight(queue)
  current_weight = current_queue_weight(queue)
  if maximum_weight > current_weight
    logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
    @queues << queue
  end
end
next_queue() click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 11
def next_queue
  unpause_queues
  queue = @queues.shift
  return nil if queue.nil?

  @queues << queue
  QueueConfiguration.new(queue, {})
end

Private Instance Methods

current_queue_weight(queue) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 65
def current_queue_weight(queue)
  queue_weight(@queues, queue)
end
maximum_queue_weight(queue) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 69
def maximum_queue_weight(queue)
  queue_weight(@initial_queues, queue)
end
pause(queue) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 49
def pause(queue)
  return unless @queues.delete(queue)

  @paused_queues << [Time.now + delay, queue]
  logger.debug "Paused #{queue}"
end
queue_weight(queues, queue) click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 73
def queue_weight(queues, queue)
  queues.count { |q| q == queue }
end
unpause_queues() click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 56
def unpause_queues
  return if @paused_queues.empty?
  return if Time.now < @paused_queues.first[0]

  pause = @paused_queues.shift
  @queues << pause[1]
  logger.debug "Unpaused #{pause[1]}"
end