class Qless::Workers::BaseWorker
Attributes
interval[RW]
options[RW]
output[RW]
paused[RW]
reserver[RW]
sighup_handler[RW]
Public Class Methods
new(reserver, options = {})
click to toggle source
# File lib/qless/worker/base.rb, line 20 def initialize(reserver, options = {}) # Our job reserver and options @reserver = reserver @options = options # SIGHUP handler @sighup_handler = options.fetch(:sighup_handler) { lambda { } } # Our logger @log = options.fetch(:logger) do @output = options.fetch(:output, $stdout) Logger.new(output).tap do |logger| logger.level = options.fetch(:log_level, Logger::WARN) logger.formatter = options.fetch(:log_formatter) do Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" } end end end # The interval for checking for new jobs @interval = options.fetch(:interval, 5.0) @current_job_mutex = Mutex.new @current_job = nil # Default behavior when a lock is lost: stop after the current job. on_current_job_lock_lost { shutdown(in_signal_handler=false) } end
Public Instance Methods
deregister()
click to toggle source
# File lib/qless/worker/base.rb, line 188 def deregister uniq_clients.each do |client| client.deregister_workers(client.worker_name) end end
fail_job(job, error, worker_backtrace)
click to toggle source
# File lib/qless/worker/base.rb, line 178 def fail_job(job, error, worker_backtrace) failure = Qless.failure_formatter.format(job, error, worker_backtrace) log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.message}" ) job.fail(*failure) rescue Job::CantFailError => e # There's not much we can do here. Another worker may have cancelled it, # or we might not own the job, etc. Logging is the best we can do. log(:error, "Failed to fail #{job.inspect}: #{e.message}") end
jobs()
click to toggle source
Return an enumerator to each of the jobs provided by the reserver
# File lib/qless/worker/base.rb, line 84 def jobs return Enumerator.new do |enum| loop do begin job = reserver.reserve rescue Exception => error # We want workers to durably stay up, so we don't want errors # during job reserving (e.g. network timeouts, etc) to kill the # worker. log(:error, "Error reserving job: #{error.class}: #{error.message}") end # If we ended up getting a job, yield it. Otherwise, we wait if job.nil? no_job_available else self.current_job = job enum.yield(job) self.current_job = nil end break if @shutdown end end end
listen_for_lost_lock() { || ... }
click to toggle source
# File lib/qless/worker/base.rb, line 202 def listen_for_lost_lock subscribers = uniq_clients.map do |client| Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, message| if message['event'] == 'lock_lost' with_current_job do |job| if job && message['jid'] == job.jid @on_current_job_lock_lost.call(job) end end end end end yield ensure subscribers.each(&:stop) end
log_level()
click to toggle source
# File lib/qless/worker/base.rb, line 48 def log_level @log.level end
on_current_job_lock_lost(&block)
click to toggle source
# File lib/qless/worker/base.rb, line 198 def on_current_job_lock_lost(&block) @on_current_job_lock_lost = block end
pause(in_signal_handler=true)
click to toggle source
Pause the worker – take no more new jobs
# File lib/qless/worker/base.rb, line 147 def pause(in_signal_handler=true) @paused = true procline("Paused -- #{reserver.description}", in_signal_handler=in_signal_handler) end
perform(job)
click to toggle source
Actually perform the job
# File lib/qless/worker/base.rb, line 112 def perform(job) start_time = Time.now.to_f around_perform(job) rescue JobLockLost log(:warn, "Lost lock for job #{job.jid}") rescue Exception => error fail_job(job, error, caller) else try_complete(job) ensure elapsed_time = Time.now.to_f - start_time log(:info, "Job #{job.description} took #{elapsed_time} seconds") end
procline(value, in_signal_handler=true)
click to toggle source
Set the procline. Not supported on all systems
# File lib/qless/worker/base.rb, line 158 def procline(value, in_signal_handler=true) $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}" log(:debug, $PROGRAM_NAME) unless in_signal_handler end
register_signal_handlers()
click to toggle source
The meaning of these signals is meant to closely mirror resque
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2
HUP: Print current stack to log and continue
# File lib/qless/worker/base.rb, line 69 def register_signal_handlers # Otherwise, we want to take the appropriate action trap('TERM') { exit! } trap('INT') { exit! } safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') { shutdown(in_signal_handler=true) } begin trap('CONT') { unpause(in_signal_handler=true) } trap('USR2') { pause(in_signal_handler=true) } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end
safe_trap(signal_name, &cblock)
click to toggle source
# File lib/qless/worker/base.rb, line 52 def safe_trap(signal_name, &cblock) begin trap(signal_name, cblock) rescue ArgumentError warn "Signal #{signal_name} not supported." end end
shutdown(in_signal_handler=true)
click to toggle source
Stop processing after this job
# File lib/qless/worker/base.rb, line 141 def shutdown(in_signal_handler=true) @shutdown = true end
Also aliased as: stop!
try_complete(job)
click to toggle source
Complete the job unless the worker has already put it into another state by completing / failing / etc. the job
# File lib/qless/worker/base.rb, line 165 def try_complete(job) job.complete unless job.state_changed? rescue Job::CantCompleteError => e # There's not much we can do here. Complete fails in a few cases: # - The job is already failed (i.e. by another worker) # - The job is being worked on by another worker # - The job has been cancelled # # We don't want to (or are able to) fail the job with this error in # any of these cases, so the best we can do is log the failure. log(:warn, "Failed to complete #{job.inspect}: #{e.message}") end
uniq_clients()
click to toggle source
# File lib/qless/worker/base.rb, line 194 def uniq_clients @uniq_clients ||= reserver.queues.map(&:client).uniq end
unpause(in_signal_handler=true)
click to toggle source
Continue taking new jobs
# File lib/qless/worker/base.rb, line 153 def unpause(in_signal_handler=true) @paused = false end
Private Instance Methods
current_job=(job)
click to toggle source
# File lib/qless/worker/base.rb, line 240 def current_job=(job) @current_job_mutex.synchronize do @current_job = job end end
log(type, msg)
click to toggle source
# File lib/qless/worker/base.rb, line 222 def log(type, msg) @log.public_send(type, "#{Process.pid}: #{msg}") end
no_job_available()
click to toggle source
# File lib/qless/worker/base.rb, line 226 def no_job_available unless interval.zero? procline("Waiting for #{reserver.description}", in_signal_handler=false) log(:debug, "Sleeping for #{interval} seconds") sleep interval end end
reconnect_each_client()
click to toggle source
# File lib/qless/worker/base.rb, line 246 def reconnect_each_client uniq_clients.each { |client| client.redis.client.reconnect } end
with_current_job() { |current_job| ... }
click to toggle source
# File lib/qless/worker/base.rb, line 234 def with_current_job @current_job_mutex.synchronize do yield @current_job end end