module Sidekiq::LimitFetch::Queues
Constants
- THREAD_KEY
Public Instance Methods
acquire()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 21 def acquire selector.acquire(ordered_queues, namespace) .tap {|it| save it } .map {|it| "queue:#{it}" } end
add(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 40 def add(queues) queues.each do |queue| unless @queues.include? queue apply_process_limit_to_queue(queue) apply_limit_to_queue(queue) @queues.push queue end end end
dynamic?()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 36 def dynamic? @dynamic end
namespace()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 60 def namespace @namespace ||= Sidekiq.redis do |it| if it.respond_to?(:namespace) and it.namespace "#{it.namespace}:" else '' end end end
ordered_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 53 def ordered_queues; @queues end
release_except(full_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 27 def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end end
start(options)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 6 def start(options) @queues = options[:queues] @dynamic = options[:dynamic] @limits = options[:limits] || {} @process_limits = options[:process_limits] || {} @blocks = options[:blocking] || [] options[:strict] ? strict_order! : weighted_order! apply_process_limit_to_queues apply_limit_to_queues apply_blocks_to_queues end
strict_order!()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 51 def strict_order! @queues.uniq! def ordered_queues; @queues end end
weighted_order!()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 56 def weighted_order! def ordered_queues; @queues.shuffle.uniq end end
Private Instance Methods
apply_blocks_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 97 def apply_blocks_to_queues @queues.uniq.each do |queue_name| Sidekiq::Queue[queue_name].unblock end @blocks.to_a.each do |it| if it.is_a? Array it.each {|name| Sidekiq::Queue[name].block_except it } else Sidekiq::Queue[it].block end end end
apply_limit_to_queue(queue_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 89 def apply_limit_to_queue(queue_name) queue = Sidekiq::Queue[queue_name] unless queue.limit_changed? queue.limit = @limits[queue_name.to_s] || @limits[queue_name.to_sym] end end
apply_limit_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 83 def apply_limit_to_queues @queues.uniq.each do |queue_name| apply_limit_to_queue(queue_name) end end
apply_process_limit_to_queue(queue_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 78 def apply_process_limit_to_queue(queue_name) queue = Sidekiq::Queue[queue_name] queue.process_limit = @process_limits[queue_name.to_s] || @process_limits[queue_name.to_sym] end
apply_process_limit_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 72 def apply_process_limit_to_queues @queues.uniq.each do |queue_name| apply_process_limit_to_queue(queue_name) end end
restore()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 119 def restore Thread.current[THREAD_KEY] || [] ensure Thread.current[THREAD_KEY] = nil end
save(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 115 def save(queues) Thread.current[THREAD_KEY] = queues end
selector()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 111 def selector Sidekiq::LimitFetch::Global::Selector end