module Sidekiq::LimitFetch::Queues

Constants

THREAD_KEY

Public Instance Methods

acquire() click to toggle source

rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity

# File lib/sidekiq/limit_fetch/queues.rb, line 49
def acquire
  queues = saved
  queues ||= Sidekiq::LimitFetch.redis_retryable do
    selector.acquire(ordered_queues, namespace)
  end
  save queues
  queues.map { |it| "queue:#{it}" }
end
add(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 78
def add(queues)
  return unless queues

  queues.each do |queue|
    next if @queues.include? queue

    if startup_queue?(queue)
      apply_process_limit_to_queue(queue)
      apply_limit_to_queue(queue)
    end

    @queues.push queue
  end
end
dynamic?() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 66
def dynamic?
  @dynamic
end
dynamic_exclude() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 74
def dynamic_exclude
  @dynamic_exclude
end
handle(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 105
def handle(queues)
  add(queues - @queues)
  remove(@queues - queues)
end
namespace() click to toggle source

rubocop:enable Lint/NestedMethodDefinition

# File lib/sidekiq/limit_fetch/queues.rb, line 125
def namespace
  @namespace ||= Sidekiq.redis do |it|
    if it.respond_to?(:namespace) && it.namespace
      "#{it.namespace}:"
    else
      ''
    end
  end
end
ordered_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 113
def ordered_queues
  @queues
end
release_except(full_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 58
def release_except(full_name)
  queues = restore
  queues.delete full_name[/queue:(.*)/, 1] if full_name
  Sidekiq::LimitFetch.redis_retryable do
    selector.release queues, namespace
  end
end
remove(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 93
def remove(queues)
  return unless queues

  queues.each do |queue|
    next unless @queues.include? queue

    clear_limits_for_queue(queue)
    @queues.delete queue
    Sidekiq::Queue.delete_instance(queue)
  end
end
start(capsule_or_options) click to toggle source

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity

# File lib/sidekiq/limit_fetch/queues.rb, line 14
def start(capsule_or_options)
  config = Sidekiq::LimitFetch.post_7? ? capsule_or_options.config : capsule_or_options

  @queues = config[:queues].map do |queue|
    if queue.is_a? Array
      queue.first
    else
      queue
    end
  end.uniq
  @startup_queues = @queues.dup

  if config[:dynamic].is_a? Hash
    @dynamic         = true
    @dynamic_exclude = config[:dynamic][:exclude] || []
  else
    @dynamic = config[:dynamic]
    @dynamic_exclude = []
  end

  @limits         = config[:limits] || {}
  @process_limits = config[:process_limits] || {}
  @blocks         = config[:blocking] || []

  config[:strict] ? strict_order! : weighted_order!

  apply_process_limit_to_queues
  apply_limit_to_queues
  apply_blocks_to_queues
end
startup_queue?(queue) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 70
def startup_queue?(queue)
  @startup_queues.include?(queue)
end
strict_order!() click to toggle source

rubocop:disable Lint/NestedMethodDefinition

# File lib/sidekiq/limit_fetch/queues.rb, line 111
def strict_order!
  @queues.uniq!
  def ordered_queues
    @queues
  end
end
weighted_order!() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 118
def weighted_order!
  def ordered_queues
    @queues.shuffle.uniq
  end
end

Private Instance Methods

apply_blocks_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 162
def apply_blocks_to_queues
  @queues.uniq.each do |queue_name|
    Sidekiq::Queue[queue_name].unblock
  end

  @blocks.to_a.each do |it|
    if it.is_a? Array
      it.each { |name| Sidekiq::Queue[name].block_except it }
    else
      Sidekiq::Queue[it].block
    end
  end
end
apply_limit_to_queue(queue_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 154
def apply_limit_to_queue(queue_name)
  queue = Sidekiq::Queue[queue_name]

  return if queue.limit_changed?

  queue.limit = @limits[queue_name.to_s] || @limits[queue_name.to_sym]
end
apply_limit_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 148
def apply_limit_to_queues
  @queues.uniq.each do |queue_name|
    apply_limit_to_queue(queue_name)
  end
end
apply_process_limit_to_queue(queue_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 143
def apply_process_limit_to_queue(queue_name)
  queue = Sidekiq::Queue[queue_name]
  queue.process_limit = @process_limits[queue_name.to_s] || @process_limits[queue_name.to_sym]
end
apply_process_limit_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 137
def apply_process_limit_to_queues
  @queues.uniq.each do |queue_name|
    apply_process_limit_to_queue(queue_name)
  end
end
clear_limits_for_queue(queue_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 176
def clear_limits_for_queue(queue_name)
  queue = Sidekiq::Queue[queue_name]
  queue.clear_limits
end
restore() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 193
def restore
  saved || []
ensure
  save nil
end
save(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 189
def save(queues)
  Thread.current[THREAD_KEY] = queues
end
saved() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 185
def saved
  Thread.current[THREAD_KEY]
end
selector() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 181
def selector
  Sidekiq::LimitFetch::Global::Selector
end