class Shoryuken::Polling::WeightedRoundRobin
Public Class Methods
new(queues, delay = nil)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 4 def initialize(queues, delay = nil) @initial_queues = queues @queues = queues.dup.uniq @paused_queues = [] @delay = delay end
Public Instance Methods
active_queues()
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 34 def active_queues unparse_queues(@queues) end
message_processed(queue)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 38 def message_processed(queue) return if @paused_queues.empty? logger.debug "Unpausing #{queue}" @paused_queues.reject! { |_time, name| name == queue } @queues << queue @queues.uniq! end
messages_found(queue, messages_found)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 20 def messages_found(queue, messages_found) if messages_found == 0 pause(queue) return end maximum_weight = maximum_queue_weight(queue) current_weight = current_queue_weight(queue) if maximum_weight > current_weight logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" } @queues << queue end end
next_queue()
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 11 def next_queue unpause_queues queue = @queues.shift return nil if queue.nil? @queues << queue QueueConfiguration.new(queue, {}) end
Private Instance Methods
current_queue_weight(queue)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 65 def current_queue_weight(queue) queue_weight(@queues, queue) end
maximum_queue_weight(queue)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 69 def maximum_queue_weight(queue) queue_weight(@initial_queues, queue) end
pause(queue)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 49 def pause(queue) return unless @queues.delete(queue) @paused_queues << [Time.now + delay, queue] logger.debug "Paused #{queue}" end
queue_weight(queues, queue)
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 73 def queue_weight(queues, queue) queues.count { |q| q == queue } end
unpause_queues()
click to toggle source
# File lib/shoryuken/polling/weighted_round_robin.rb, line 56 def unpause_queues return if @paused_queues.empty? return if Time.now < @paused_queues.first[0] pause = @paused_queues.shift @queues << pause[1] logger.debug "Unpaused #{pause[1]}" end