class AtomicSidekiq::DeadJobCollector

Attributes

expire_op[R]
in_flight_keymaker[R]
queue[R]
recovered_stats[R]

Public Class Methods

collect!(queues, in_flight_keymaker:, skip_recovery_queues: []) click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 4
def collect!(queues, in_flight_keymaker:, skip_recovery_queues: [])
  queues.each do |q|
    new(q, in_flight_keymaker: in_flight_keymaker)
      .collect!(skip_recovery: skip_recovery_queues.include?(q))
  end
end
new(queue, in_flight_keymaker:) click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 12
def initialize(queue, in_flight_keymaker:)
  @recovered_stats    = RecoveredStats.new
  @queue              = queue
  @in_flight_keymaker = in_flight_keymaker
  @expire_op          = AtomicOperation::Expire.new
end

Public Instance Methods

collect!(skip_recovery: false) click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 19
def collect!(skip_recovery: false)
  each_keys { |job_key| expire!(job_key, skip_recovery: skip_recovery) }
end

Private Instance Methods

each_keys() { |job_key| ... } click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 36
def each_keys
  it = 0
  Sidekiq.redis do |conn|
    loop do
      it, job_keys = conn.scan(it, match: keys_prefix)
      it = it.to_i
      job_keys.each { |job_key| yield(job_key) }
      break if it.zero?
    end
  end
end
expire!(job_key, skip_recovery:) click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 27
def expire!(job_key, skip_recovery:)
  recovered = expire_op.perform(queue, job_key, recover: !skip_recovery)
  return if recovered.nil?

  job = JSON.parse(recovered[1])
  recovered_stats.increment!(job)
  job
end
keys_prefix() click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 48
def keys_prefix
  in_flight_keymaker.queue_matcher(queue)
end