module Sidekiq::LimitFetch

Constants

RedisBaseConnectionError
RedisCommandError
TIMEOUT

Public Instance Methods

bulk_requeue(*args) click to toggle source

Backwards compatibility for sidekiq v6.1.0 @see github.com/mperham/sidekiq/pull/4602

# File lib/sidekiq/limit_fetch.rb, line 52
def bulk_requeue(*args)
  if Sidekiq::BasicFetch.respond_to?(:bulk_requeue) # < 6.1.0
    Sidekiq::BasicFetch.bulk_requeue(*args)
  else # 6.1.0+
    Sidekiq::BasicFetch.new(post_7? ? Sidekiq.default_configuration.default_capsule : config).bulk_requeue(*args)
  end
end
config() click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 45
def config
  # Post 6.5, Sidekiq.options is deprecated and replaced with passing Sidekiq directly
  post_6_5? ? Sidekiq : Sidekiq.options
end
new(_) click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 35
def new(_)
  self
end
post_6_5?() click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 28
def post_6_5?
  @post_6_5 ||= Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('6.5.0')
end
post_7?() click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 24
def post_7?
  @post_7 ||= Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('7.0.0')
end
redis_retryable() { || ... } click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 60
def redis_retryable
  yield
rescue RedisBaseConnectionError
  sleep TIMEOUT
  retry
rescue RedisCommandError => e
  # If Redis was restarted and is still loading its snapshot,
  # then we should treat this as a temporary connection error too.
  raise unless e.message =~ /^LOADING/

  sleep TIMEOUT
  retry
end
retrieve_work() click to toggle source
# File lib/sidekiq/limit_fetch.rb, line 39
def retrieve_work
  queue, job = redis_brpop(Queues.acquire)
  Queues.release_except(queue)
  UnitOfWork.new(queue, job) if job
end

Private Instance Methods

redis_brpop(queues) click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/sidekiq/limit_fetch.rb, line 77
def redis_brpop(queues)
  if queues.empty?
    sleep TIMEOUT  # there are no queues to handle, so lets sleep
    []             # and return nothing
  else
    redis_retryable do
      Sidekiq.redis do |it|
        if post_7?
          it.blocking_call(false, 'brpop', *queues, TIMEOUT)
        else
          it.brpop(*queues, timeout: TIMEOUT)
        end
      end
    end
  end
end