class Qless::Workers::BaseWorker

Attributes

interval[RW]
options[RW]
output[RW]
paused[RW]
reserver[RW]
sighup_handler[RW]

Public Class Methods

new(reserver, options = {}) click to toggle source
# File lib/qless/worker/base.rb, line 20
def initialize(reserver, options = {})
  # Our job reserver and options
  @reserver = reserver
  @options = options

  # SIGHUP handler
  @sighup_handler = options.fetch(:sighup_handler) { lambda { } }

  # Our logger
  @log = options.fetch(:logger) do
    @output = options.fetch(:output, $stdout)
    Logger.new(output).tap do |logger|
      logger.level = options.fetch(:log_level, Logger::WARN)
      logger.formatter = options.fetch(:log_formatter) do
        Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
      end
    end
  end

  # The interval for checking for new jobs
  @interval = options.fetch(:interval, 5.0)
  @current_job_mutex = Mutex.new
  @current_job = nil

  # Default behavior when a lock is lost: stop after the current job.
  on_current_job_lock_lost { shutdown(in_signal_handler=false) }
end

Public Instance Methods

deregister() click to toggle source
# File lib/qless/worker/base.rb, line 188
def deregister
  uniq_clients.each do |client|
    client.deregister_workers(client.worker_name)
  end
end
fail_job(job, error, worker_backtrace) click to toggle source
# File lib/qless/worker/base.rb, line 178
def fail_job(job, error, worker_backtrace)
  failure = Qless.failure_formatter.format(job, error, worker_backtrace)
  log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.message}" )
  job.fail(*failure)
rescue Job::CantFailError => e
  # There's not much we can do here. Another worker may have cancelled it,
  # or we might not own the job, etc. Logging is the best we can do.
  log(:error, "Failed to fail #{job.inspect}: #{e.message}")
end
jobs() click to toggle source

Return an enumerator to each of the jobs provided by the reserver

# File lib/qless/worker/base.rb, line 84
def jobs
  return Enumerator.new do |enum|
    loop do
      begin
        job = reserver.reserve
      rescue Exception => error
        # We want workers to durably stay up, so we don't want errors
        # during job reserving (e.g. network timeouts, etc) to kill the
        # worker.
        log(:error,
          "Error reserving job: #{error.class}: #{error.message}")
      end

      # If we ended up getting a job, yield it. Otherwise, we wait
      if job.nil?
        no_job_available
      else
        self.current_job = job
        enum.yield(job)
        self.current_job = nil
      end

      break if @shutdown
    end
  end
end
listen_for_lost_lock() { || ... } click to toggle source
# File lib/qless/worker/base.rb, line 202
def listen_for_lost_lock
  subscribers = uniq_clients.map do |client|
    Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, message|
      if message['event'] == 'lock_lost'
        with_current_job do |job|
          if job && message['jid'] == job.jid
            @on_current_job_lock_lost.call(job)
          end
        end
      end
    end
  end

  yield
ensure
  subscribers.each(&:stop)
end
log_level() click to toggle source
# File lib/qless/worker/base.rb, line 48
def log_level
  @log.level
end
on_current_job_lock_lost(&block) click to toggle source
# File lib/qless/worker/base.rb, line 198
def on_current_job_lock_lost(&block)
  @on_current_job_lock_lost = block
end
pause(in_signal_handler=true) click to toggle source

Pause the worker – take no more new jobs

# File lib/qless/worker/base.rb, line 147
def pause(in_signal_handler=true)
  @paused = true
  procline("Paused -- #{reserver.description}", in_signal_handler=in_signal_handler)
end
perform(job) click to toggle source

Actually perform the job

# File lib/qless/worker/base.rb, line 112
def perform(job)
  start_time = Time.now.to_f
  around_perform(job)
rescue JobLockLost
  log(:warn, "Lost lock for job #{job.jid}")
rescue Exception => error
  fail_job(job, error, caller)
else
  try_complete(job)
ensure
  elapsed_time = Time.now.to_f - start_time
  log(:info, "Job #{job.description} took #{elapsed_time} seconds")
end
procline(value, in_signal_handler=true) click to toggle source

Set the procline. Not supported on all systems

# File lib/qless/worker/base.rb, line 158
def procline(value, in_signal_handler=true)
  $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}"
  log(:debug, $PROGRAM_NAME) unless in_signal_handler
end
register_signal_handlers() click to toggle source

The meaning of these signals is meant to closely mirror resque

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2

HUP: Print current stack to log and continue
# File lib/qless/worker/base.rb, line 69
def register_signal_handlers
  # Otherwise, we want to take the appropriate action
  trap('TERM') { exit! }
  trap('INT')  { exit! }
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') { shutdown(in_signal_handler=true) }
  begin
    trap('CONT') { unpause(in_signal_handler=true) }
    trap('USR2') { pause(in_signal_handler=true) }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end
safe_trap(signal_name, &cblock) click to toggle source
# File lib/qless/worker/base.rb, line 52
def safe_trap(signal_name, &cblock)
  begin
    trap(signal_name, cblock)
  rescue ArgumentError
    warn "Signal #{signal_name} not supported."
  end
end
shutdown(in_signal_handler=true) click to toggle source

Stop processing after this job

# File lib/qless/worker/base.rb, line 141
def shutdown(in_signal_handler=true)
  @shutdown = true
end
Also aliased as: stop!
stop!(in_signal_handler=true)
Alias for: shutdown
try_complete(job) click to toggle source

Complete the job unless the worker has already put it into another state by completing / failing / etc. the job

# File lib/qless/worker/base.rb, line 165
def try_complete(job)
  job.complete unless job.state_changed?
rescue Job::CantCompleteError => e
  # There's not much we can do here. Complete fails in a few cases:
  #   - The job is already failed (i.e. by another worker)
  #   - The job is being worked on by another worker
  #   - The job has been cancelled
  #
  # We don't want to (or are able to) fail the job with this error in
  # any of these cases, so the best we can do is log the failure.
  log(:warn, "Failed to complete #{job.inspect}: #{e.message}")
end
uniq_clients() click to toggle source
# File lib/qless/worker/base.rb, line 194
def uniq_clients
  @uniq_clients ||= reserver.queues.map(&:client).uniq
end
unpause(in_signal_handler=true) click to toggle source

Continue taking new jobs

# File lib/qless/worker/base.rb, line 153
def unpause(in_signal_handler=true)
  @paused = false
end

Private Instance Methods

current_job=(job) click to toggle source
# File lib/qless/worker/base.rb, line 240
def current_job=(job)
  @current_job_mutex.synchronize do
    @current_job = job
  end
end
log(type, msg) click to toggle source
# File lib/qless/worker/base.rb, line 222
def log(type, msg)
  @log.public_send(type, "#{Process.pid}: #{msg}")
end
no_job_available() click to toggle source
# File lib/qless/worker/base.rb, line 226
def no_job_available
  unless interval.zero?
    procline("Waiting for #{reserver.description}", in_signal_handler=false)
    log(:debug, "Sleeping for #{interval} seconds")
    sleep interval
  end
end
reconnect_each_client() click to toggle source
# File lib/qless/worker/base.rb, line 246
def reconnect_each_client
  uniq_clients.each { |client| client.redis.client.reconnect }
end
with_current_job() { |current_job| ... } click to toggle source
# File lib/qless/worker/base.rb, line 234
def with_current_job
  @current_job_mutex.synchronize do
    yield @current_job
  end
end