class Sidekiq::Monitor::Processor
Public Instance Methods
complete(worker, item, queue, return_value)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 22 def complete(worker, item, queue, return_value) job = find_or_initialize_job(worker, item, queue) job.update_attributes( finished_at: DateTime.now, status: 'complete', result: (return_value if return_value.is_a?(Hash)) ) end
error(worker, item, queue, exception)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 17 def error(worker, item, queue, exception) job = find_or_initialize_job(worker, item, queue) set_error(job, exception) end
queue(worker_class, item, queue)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 4 def queue(worker_class, item, queue) job = find_or_initialize_job(worker_class, item, queue) job.save end
start(worker, item, queue)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 9 def start(worker, item, queue) job = find_or_initialize_job(worker, item, queue) job.update_attributes( started_at: DateTime.now, status: 'running' ) end
Protected Instance Methods
find_or_initialize_job(worker, item, queue, options={})
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 33 def find_or_initialize_job(worker, item, queue, options={}) defaults = { set_name: true } options.reverse_merge!(defaults) worker_class = nil if worker.is_a?(String) worker_class = worker.constantize elsif worker.is_a?(Class) worker_class = worker else worker_class = worker.class end job = Sidekiq::Monitor::Job.find_by_jid(item['jid']) if job.blank? attributes = { jid: item['jid'], queue: queue, class_name: worker_class.name, args: item['args'], retry: item['retry'], enqueued_at: DateTime.now, status: 'queued' } if options[:set_name] == true attributes[:name] = job_name(worker_class, item, queue) end job = Sidekiq::Monitor::Job.new(attributes) end job end
job_name(worker_class, item, queue)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 67 def job_name(worker_class, item, queue) args = item['args'] begin worker_class.respond_to?(:job_name) ? worker_class.job_name(*args) : nil rescue Exception => exception # If the job doesn't exist yet, we'll need to create it job = find_or_initialize_job(worker_class, item, queue, set_name: false) set_error(job, exception) raise exception end end
set_error(job, exception)
click to toggle source
# File lib/sidekiq/monitor/processor.rb, line 79 def set_error(job, exception) result = job.result.present? ? job.result.symbolize_keys : {} result.merge!({ message: "#{exception.class.name}: #{exception.message}", backtrace: exception.backtrace }) job.update_attributes( finished_at: DateTime.now, status: 'failed', result: result ) end