module Sidekiq::ResqueStatus
Constants
- VERSION
Public Instance Methods
enqueue_job(worker, msg, queue)
click to toggle source
> This method will be called by the client middleware before enqueing a job.¶ ↑
> It stores information on the current job.¶ ↑
> These information will be display by resque web-ui each time a user is browsing:¶ ↑
- the statuses page (provided by resque-status) - the job_stats page (provided by resque-job-stats)
# File lib/sidekiq-resque_status.rb, line 19 def enqueue_job(worker, msg, queue) now = Time.now.utc + 1 # Resque Job Stats equivalent increment_stat("stats:jobs:#{worker.name}:enqueued", now) # Status set to queued status_hash = { :time => now.to_i, :class => worker.name, :retry => false, :name => "#{worker.name}(#{msg['args']})", :status => "queued", :uuid => msg['jid'], :args => msg['args']} update_status("status:#{msg['jid']}", "queued", status_hash) # Add the job id to the _statuses key redis.zadd("_statuses", now.to_i, msg['jid']) end
job_completed(worker, msg, queue, duration = 0)
click to toggle source
> This method will be called by the server middleware after processing a job.¶ ↑
> It adds a description of the processed jobs.¶ ↑
> It updates information on the current job.¶ ↑
> These information will be display by resque web-ui each time a user is browsing:¶ ↑
- the statuses page (provided by resque-status) - the job_stats page (provided by resque-job-stats) - the queues page (provided by resque-web)
# File lib/sidekiq-resque_status.rb, line 59 def job_completed(worker, msg, queue, duration = 0) status_hash = complete_options(worker.to_process || 1, worker.processed || 1, duration, worker.description) status_hash = set_missing_values(worker, msg, queue, status_hash) # Status set to completed hash = update_status("status:#{msg['jid']}", "completed", status_hash) || {} time = hash["time"] || hash["run_at"] now = time ? Time.at(time.to_i) : Time.now.utc # Resque job Stats equivalent increment_stat("stats:jobs:#{msg['jid']}:timeseries:performed", now) increment_stat("stats:jobs:#{worker.class.name}:performed", now) # Set duration redis.rpush("stats:jobs:#{worker.class.name}:duration", duration) redis.rpush("stats:jobs:#{msg['jid']}:duration", duration) # remove job from the queue tab redis.lpop("queue:#{queue}") end
job_failed(worker, msg, queue, error)
click to toggle source
> This method will be called by the server middleware each time a job failed.¶ ↑
> It adds a complete description of the failure. ¶ ↑
> It updates information on the current job.¶ ↑
> It makes sure the job can be replay.¶ ↑
> These information will be display by resque web-ui each time a user is browsing:¶ ↑
- the failed page (provided by resque-web)
# File lib/sidekiq-resque_status.rb, line 88 def job_failed(worker, msg, queue, error) hash = merge_value("status:#{msg['jid']}", {"status" => "failed", "message" => error.message}) update_status("status:#{msg['jid']}", "failed", hash) # pass the jid into args hash to replay the job args = msg['args'].is_a?(Array) && msg['args'].first.is_a?(Hash) ? [msg['args'].first.merge({'jid' => msg['jid']})] : msg['args'] failed_message = { :failed_at => Time.now.rfc2822, :payload => {"class" => worker.class.name, "args" => args}, :class => worker.class.name, :exception => error.class.name, :error => error.message, :backtrace => error.backtrace, :worker => queue, :queue => queue, :args => args, :jid => msg['jid'] } # Push the failed information into redis redis.rpush('failed', MultiJson.dump(failed_message)) # Increment failed statistics for job Stats increment_stat("stats:jobs:#{worker.class.name}:failed", Time.now) increment_expire_key("stat:failed") end
job_in_progress(worker, msg, queue)
click to toggle source
> This method will be called by the server middleware before processing a job.¶ ↑
> It makes sure we are not loosing any information at the beggining of the process.¶ ↑
> It updates information on the current job.¶ ↑
> These information will be display by resque web-ui each time a user is browsing:¶ ↑
- the statuses page (provided by resque-status) - the job_stats page (provided by resque-job-stats)
# File lib/sidekiq-resque_status.rb, line 41 def job_in_progress(worker, msg, queue) # When resqueue web re-enqueue a job we need to make sure worker.jid and msg[jid] are defined worker.jid ||= msg['jid'] ||= msg['args'].first['jid'] if msg['args'] && msg['args'].is_a?(Array) # Set status to working status_hash = set_missing_values(worker, msg, queue) status_hash = update_status("status:#{msg['jid']}", "working", status_hash) end
Private Instance Methods
complete_options(total_to_process, total_processed, duration = 0, message = nil)
click to toggle source
> Return hash containing some statistics about the processed job¶ ↑
# File lib/sidekiq-resque_status.rb, line 127 def complete_options(total_to_process, total_processed, duration = 0, message = nil) average = (duration/total_processed rescue 0).round(1) time = duration.round(1) message ||= "processed #{total_processed} in A: #{average} T: #{time}" {"status" => "completed", :total => total_to_process, :num => total_processed, :message => message} end
increment_expire_key(key, duration = nil)
click to toggle source
increment_stat(key, now)
click to toggle source
> Build Hourly, Daily and Global statistics that will be used by resque-job-stats¶ ↑
# File lib/sidekiq-resque_status.rb, line 146 def increment_stat(key, now) # Increment global stats increment_expire_key(key) # Increment hourly stats increment_expire_key(key + ":#{now.hour}:#{now.min}:#{now.sec}", 3660) # Increment daily stats increment_expire_key(key + ":#{now.hour}:#{now.min}", 900000) end
merge_value(key,hash)
click to toggle source
redis()
click to toggle source
set_missing_values(worker, msg, queue, status_hash = {})
click to toggle source
> Ensure we always get track of the important informations concerning the worker¶ ↑
# File lib/sidekiq-resque_status.rb, line 180 def set_missing_values(worker, msg, queue, status_hash = {}) status_hash['jid'] = msg['jid'] if status_hash['jid'].nil? status_hash['queue'] = queue if status_hash['queue'].nil? status_hash['class'] = worker.class.name if status_hash['class'].nil? status_hash end
update_status(key, status, hash = nil)
click to toggle source
> Update the status of a job and add more information to it if requested¶ ↑
# File lib/sidekiq-resque_status.rb, line 160 def update_status(key, status, hash = nil) status_hash = merge_value(key, {"status" => status}) || {} status_hash.merge!(hash) if hash redis.set(key, MultiJson.dump(status_hash)) redis.expire(key, 260000) status_hash end