class AtomicSidekiq::DeadJobCollector
Attributes
expire_op[R]
in_flight_keymaker[R]
queue[R]
recovered_stats[R]
Public Class Methods
collect!(queues, in_flight_keymaker:, skip_recovery_queues: [])
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 4 def collect!(queues, in_flight_keymaker:, skip_recovery_queues: []) queues.each do |q| new(q, in_flight_keymaker: in_flight_keymaker) .collect!(skip_recovery: skip_recovery_queues.include?(q)) end end
new(queue, in_flight_keymaker:)
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 12 def initialize(queue, in_flight_keymaker:) @recovered_stats = RecoveredStats.new @queue = queue @in_flight_keymaker = in_flight_keymaker @expire_op = AtomicOperation::Expire.new end
Public Instance Methods
collect!(skip_recovery: false)
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 19 def collect!(skip_recovery: false) each_keys { |job_key| expire!(job_key, skip_recovery: skip_recovery) } end
Private Instance Methods
each_keys() { |job_key| ... }
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 36 def each_keys it = 0 Sidekiq.redis do |conn| loop do it, job_keys = conn.scan(it, match: keys_prefix) it = it.to_i job_keys.each { |job_key| yield(job_key) } break if it.zero? end end end
expire!(job_key, skip_recovery:)
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 27 def expire!(job_key, skip_recovery:) recovered = expire_op.perform(queue, job_key, recover: !skip_recovery) return if recovered.nil? job = JSON.parse(recovered[1]) recovered_stats.increment!(job) job end
keys_prefix()
click to toggle source
# File lib/atomic_sidekiq/dead_job_collector.rb, line 48 def keys_prefix in_flight_keymaker.queue_matcher(queue) end