module Sidekiq::LimitFetch::Queues
Constants
- THREAD_KEY
Public Instance Methods
acquire()
click to toggle source
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity
# File lib/sidekiq/limit_fetch/queues.rb, line 49 def acquire queues = saved queues ||= Sidekiq::LimitFetch.redis_retryable do selector.acquire(ordered_queues, namespace) end save queues queues.map { |it| "queue:#{it}" } end
add(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 78 def add(queues) return unless queues queues.each do |queue| next if @queues.include? queue if startup_queue?(queue) apply_process_limit_to_queue(queue) apply_limit_to_queue(queue) end @queues.push queue end end
dynamic?()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 66 def dynamic? @dynamic end
dynamic_exclude()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 74 def dynamic_exclude @dynamic_exclude end
handle(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 105 def handle(queues) add(queues - @queues) remove(@queues - queues) end
namespace()
click to toggle source
rubocop:enable Lint/NestedMethodDefinition
# File lib/sidekiq/limit_fetch/queues.rb, line 125 def namespace @namespace ||= Sidekiq.redis do |it| if it.respond_to?(:namespace) && it.namespace "#{it.namespace}:" else '' end end end
ordered_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 113 def ordered_queues @queues end
release_except(full_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 58 def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end end
remove(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 93 def remove(queues) return unless queues queues.each do |queue| next unless @queues.include? queue clear_limits_for_queue(queue) @queues.delete queue Sidekiq::Queue.delete_instance(queue) end end
start(capsule_or_options)
click to toggle source
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity
# File lib/sidekiq/limit_fetch/queues.rb, line 14 def start(capsule_or_options) config = Sidekiq::LimitFetch.post_7? ? capsule_or_options.config : capsule_or_options @queues = config[:queues].map do |queue| if queue.is_a? Array queue.first else queue end end.uniq @startup_queues = @queues.dup if config[:dynamic].is_a? Hash @dynamic = true @dynamic_exclude = config[:dynamic][:exclude] || [] else @dynamic = config[:dynamic] @dynamic_exclude = [] end @limits = config[:limits] || {} @process_limits = config[:process_limits] || {} @blocks = config[:blocking] || [] config[:strict] ? strict_order! : weighted_order! apply_process_limit_to_queues apply_limit_to_queues apply_blocks_to_queues end
startup_queue?(queue)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 70 def startup_queue?(queue) @startup_queues.include?(queue) end
strict_order!()
click to toggle source
rubocop:disable Lint/NestedMethodDefinition
# File lib/sidekiq/limit_fetch/queues.rb, line 111 def strict_order! @queues.uniq! def ordered_queues @queues end end
weighted_order!()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 118 def weighted_order! def ordered_queues @queues.shuffle.uniq end end
Private Instance Methods
apply_blocks_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 162 def apply_blocks_to_queues @queues.uniq.each do |queue_name| Sidekiq::Queue[queue_name].unblock end @blocks.to_a.each do |it| if it.is_a? Array it.each { |name| Sidekiq::Queue[name].block_except it } else Sidekiq::Queue[it].block end end end
apply_limit_to_queue(queue_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 154 def apply_limit_to_queue(queue_name) queue = Sidekiq::Queue[queue_name] return if queue.limit_changed? queue.limit = @limits[queue_name.to_s] || @limits[queue_name.to_sym] end
apply_limit_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 148 def apply_limit_to_queues @queues.uniq.each do |queue_name| apply_limit_to_queue(queue_name) end end
apply_process_limit_to_queue(queue_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 143 def apply_process_limit_to_queue(queue_name) queue = Sidekiq::Queue[queue_name] queue.process_limit = @process_limits[queue_name.to_s] || @process_limits[queue_name.to_sym] end
apply_process_limit_to_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 137 def apply_process_limit_to_queues @queues.uniq.each do |queue_name| apply_process_limit_to_queue(queue_name) end end
clear_limits_for_queue(queue_name)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 176 def clear_limits_for_queue(queue_name) queue = Sidekiq::Queue[queue_name] queue.clear_limits end
restore()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 193 def restore saved || [] ensure save nil end
save(queues)
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 189 def save(queues) Thread.current[THREAD_KEY] = queues end
saved()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 185 def saved Thread.current[THREAD_KEY] end
selector()
click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 181 def selector Sidekiq::LimitFetch::Global::Selector end