class Sidekiq::Monitor::Cleaner
Attributes
ignored_queues[R]
Public Class Methods
add_ignored_queue(queue)
click to toggle source
# File lib/sidekiq/monitor/cleaner.rb, line 9 def add_ignored_queue(queue) @ignored_queues << queue.to_s @ignored_queues.uniq! end
Public Instance Methods
clean()
click to toggle source
Cleans up records that are no longer in sync with Sidekiq’s records
# File lib/sidekiq/monitor/cleaner.rb, line 16 def clean clean_queued clean_running end
Private Instance Methods
clean_queued()
click to toggle source
# File lib/sidekiq/monitor/cleaner.rb, line 23 def clean_queued Sidekiq.redis do |conn| queues = conn.smembers('queues') queued_jids = [] queues.each do |queue| workers = conn.lrange("queue:#{queue}", 0, -1) workers.each do |worker| worker = Sidekiq.load_json(worker) queued_jids << worker['jid'] end end Sidekiq::Monitor::Job.where(status: 'queued').each do |job| if !queued_jids.include?(job.jid) && !is_ignored_queue?(job.queue) job.update_attributes( finished_at: DateTime.now, status: 'interrupted' ) end end end end
clean_running()
click to toggle source
# File lib/sidekiq/monitor/cleaner.rb, line 46 def clean_running Sidekiq.redis do |conn| workers = conn.smembers('workers') busy_jids = [] workers.each do |worker| worker = conn.get("worker:#{worker}") next if worker.blank? worker = Sidekiq.load_json(worker) busy_jids << worker['payload']['jid'] end Sidekiq::Monitor::Job.where(status: 'running').each do |job| if !busy_jids.include?(job.jid) && !is_ignored_queue?(job.queue) job.update_attributes( finished_at: DateTime.now, status: 'interrupted' ) end end end end
is_ignored_queue?(queue)
click to toggle source
# File lib/sidekiq/monitor/cleaner.rb, line 68 def is_ignored_queue?(queue) self.class.ignored_queues.include?(queue) end