module Sidekiq::LimitFetch::Queues

Constants

THREAD_KEY

Public Instance Methods

acquire() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 21
def acquire
  selector.acquire(ordered_queues, namespace)
    .tap {|it| save it }
    .map {|it| "queue:#{it}" }
end
add(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 40
def add(queues)
  queues.each do |queue|
    unless @queues.include? queue
      apply_process_limit_to_queue(queue)
      apply_limit_to_queue(queue)

      @queues.push queue
    end
  end
end
dynamic?() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 36
def dynamic?
  @dynamic
end
namespace() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 60
def namespace
  @namespace ||= Sidekiq.redis do |it|
    if it.respond_to?(:namespace) and it.namespace
      "#{it.namespace}:"
    else
      ''
    end
  end
end
ordered_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 53
def ordered_queues; @queues end
release_except(full_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 27
def release_except(full_name)
  queues = restore
  queues.delete full_name[/queue:(.*)/, 1] if full_name

  Sidekiq::LimitFetch.redis_retryable do
    selector.release queues, namespace
  end
end
start(options) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 6
def start(options)
  @queues         = options[:queues]
  @dynamic        = options[:dynamic]

  @limits         = options[:limits] || {}
  @process_limits = options[:process_limits] || {}
  @blocks         = options[:blocking] || []

  options[:strict] ? strict_order! : weighted_order!

  apply_process_limit_to_queues
  apply_limit_to_queues
  apply_blocks_to_queues
end
strict_order!() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 51
def strict_order!
  @queues.uniq!
  def ordered_queues; @queues end
end
weighted_order!() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 56
def weighted_order!
  def ordered_queues; @queues.shuffle.uniq end
end

Private Instance Methods

apply_blocks_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 97
def apply_blocks_to_queues
  @queues.uniq.each do |queue_name|
    Sidekiq::Queue[queue_name].unblock
  end

  @blocks.to_a.each do |it|
    if it.is_a? Array
      it.each {|name| Sidekiq::Queue[name].block_except it }
    else
      Sidekiq::Queue[it].block
    end
  end
end
apply_limit_to_queue(queue_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 89
def apply_limit_to_queue(queue_name)
  queue = Sidekiq::Queue[queue_name]

  unless queue.limit_changed?
    queue.limit = @limits[queue_name.to_s] || @limits[queue_name.to_sym]
  end
end
apply_limit_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 83
def apply_limit_to_queues
  @queues.uniq.each do |queue_name|
    apply_limit_to_queue(queue_name)
  end
end
apply_process_limit_to_queue(queue_name) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 78
def apply_process_limit_to_queue(queue_name)
  queue = Sidekiq::Queue[queue_name]
  queue.process_limit = @process_limits[queue_name.to_s] || @process_limits[queue_name.to_sym]
end
apply_process_limit_to_queues() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 72
def apply_process_limit_to_queues
  @queues.uniq.each do |queue_name|
    apply_process_limit_to_queue(queue_name)
  end
end
restore() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 119
def restore
  Thread.current[THREAD_KEY] || []
ensure
  Thread.current[THREAD_KEY] = nil
end
save(queues) click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 115
def save(queues)
  Thread.current[THREAD_KEY] = queues
end
selector() click to toggle source
# File lib/sidekiq/limit_fetch/queues.rb, line 111
def selector
  Sidekiq::LimitFetch::Global::Selector
end