class Resque::Worker

A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Attributes

cant_fork[RW]

Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.

run_at_exit_hooks[RW]

When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`

term_child[RW]

decide whether to use new_kill_child logic

term_timeout[RW]
to_s[W]

Public Class Methods

all() click to toggle source

Returns an array of all worker objects.

# File lib/resque/worker.rb, line 64
def self.all
  Array(redis.smembers(:workers)).map { |id| find(id) }.compact
end
attach(worker_id) click to toggle source

Alias of `find`

# File lib/resque/worker.rb, line 107
def self.attach(worker_id)
  find(worker_id)
end
exists?(worker_id) click to toggle source

Given a string worker id, return a boolean indicating whether the worker exists

# File lib/resque/worker.rb, line 113
def self.exists?(worker_id)
  redis.sismember(:workers, worker_id)
end
find(worker_id) click to toggle source

Returns a single worker object. Accepts a string id.

# File lib/resque/worker.rb, line 95
def self.find(worker_id)
  if exists? worker_id
    queues = worker_id.split(':')[-1].split(',')
    worker = new(*queues)
    worker.to_s = worker_id
    worker
  else
    nil
  end
end
new(*queues) click to toggle source

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.

# File lib/resque/worker.rb, line 128
def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }
  @shutdown = nil
  @paused = nil
  validate_queues
end
redis() click to toggle source
# File lib/resque/worker.rb, line 19
def self.redis
  Resque.redis
end
working() click to toggle source

Returns an array of all worker objects currently processing jobs.

# File lib/resque/worker.rb, line 70
def self.working
  names = all
  return [] unless names.any?

  names.map! { |name| "worker:#{name}" }

  reportedly_working = {}

  begin
    reportedly_working = redis.mapped_mget(*names).reject do |key, value|
      value.nil? || value.empty?
    end
  rescue Redis::Distributed::CannotDistribute
    names.each do |name|
      value = redis.get name
      reportedly_working[name] = value unless value.nil? || value.empty?
    end
  end

  reportedly_working.keys.map do |key|
    find key.sub("worker:", '')
  end.compact
end

Public Instance Methods

==(other) click to toggle source

Is this worker the same as another worker?

# File lib/resque/worker.rb, line 646
def ==(other)
  to_s == other.to_s
end
decode(object) click to toggle source

Given a string, returns a Ruby object.

# File lib/resque/worker.rb, line 34
def decode(object)
  return unless object

  begin
    if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
      MultiJson.load object
    else
      MultiJson.decode object
    end
  rescue ::MultiJson::DecodeError => e
    raise DecodeException, e.message, e.backtrace
  end
end
done_working() click to toggle source

Called when we are done working - clears our `working_on` state and tells Redis we processed a job.

# File lib/resque/worker.rb, line 576
def done_working
  redis.pipelined do
    processed!
    redis.del("worker:#{self}")
  end
end
enable_gc_optimizations() click to toggle source

Enables GC Optimizations if you're running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow

# File lib/resque/worker.rb, line 354
def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end
encode(object) click to toggle source

Given a Ruby object, returns a string suitable for storage in a queue.

# File lib/resque/worker.rb, line 25
def encode(object)
  if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
    MultiJson.dump object
  else
    MultiJson.encode object
  end
end
failed() click to toggle source

How many failed jobs has this worker seen? Returns an int.

# File lib/resque/worker.rb, line 595
def failed
  Stat["failed:#{self}"]
end
failed!() click to toggle source

Tells Redis we've failed a job.

# File lib/resque/worker.rb, line 600
def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end
fork(job) click to toggle source

Not every platform supports fork. Here we do our magic to determine if yours does.

# File lib/resque/worker.rb, line 318
def fork(job)
  return if @cant_fork

  # Only run before_fork hooks if we're actually going to fork
  # (after checking @cant_fork)
  run_hook :before_fork, job

  begin
    # IronRuby doesn't support `Kernel.fork` yet
    if Kernel.respond_to?(:fork)
      Kernel.fork if will_fork?
    else
      raise NotImplementedError
    end
  rescue NotImplementedError
    @cant_fork = true
    nil
  end
end
fork_per_job?() click to toggle source
# File lib/resque/worker.rb, line 635
def fork_per_job?
  ENV["FORK_PER_JOB"] != 'false'
end
glob_match(pattern) click to toggle source
# File lib/resque/worker.rb, line 310
def glob_match(pattern)
  Resque.queues.select do |queue|
    File.fnmatch?(pattern, queue)
  end.sort
end
hostname() click to toggle source

chomp'd hostname of this machine

# File lib/resque/worker.rb, line 662
def hostname
  Socket.gethostname
end
id()
Alias for: to_s
idle?() click to toggle source

Boolean - true if idle, false if not

# File lib/resque/worker.rb, line 627
def idle?
  state == :idle
end
inspect() click to toggle source
# File lib/resque/worker.rb, line 650
def inspect
  "#<Worker #{to_s}>"
end
job() click to toggle source

Returns a hash explaining the Job we're currently processing, if any.

# File lib/resque/worker.rb, line 616
def job
  decode(redis.get("worker:#{self}")) || {}
end
Also aliased as: processing
kill_child() click to toggle source

Kills the forked child immediately, without remorse. The job it is processing will not be completed.

# File lib/resque/worker.rb, line 438
def kill_child
  if @child
    log! "Killing child at #{@child}"
    if `ps -o pid,state -p #{@child}`
      Process.kill("KILL", @child) rescue nil
    else
      log! "Child #{@child} not found, restarting."
      shutdown
    end
  end
end
linux_worker_pids() click to toggle source

Find Resque worker pids on Linux and OS X.

# File lib/resque/worker.rb, line 692
def linux_worker_pids
  `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line|
    line.split(' ')[0]
  end
end
log(message) click to toggle source

Log a message to Resque.logger can't use alias_method since info/debug are private methods

# File lib/resque/worker.rb, line 722
def log(message)
  info(message)
end
log!(message) click to toggle source
# File lib/resque/worker.rb, line 726
def log!(message)
  debug(message)
end
logger_severity_deprecation_warning() click to toggle source
# File lib/resque/worker.rb, line 770
def logger_severity_deprecation_warning
  return if $TESTING
  return if $warned_logger_severity_deprecation
  Kernel.warn "*** DEPRECATION WARNING: Resque::Worker#verbose and #very_verbose are deprecated. Please set Resque.logger.level instead"
  Kernel.warn "Called from: #{caller[0..5].join("\n\t")}"
  $warned_logger_severity_deprecation = true
  nil
end
new_kill_child() click to toggle source

Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait 5 seconds, and then a KILL signal if it has not quit

# File lib/resque/worker.rb, line 453
def new_kill_child
  if @child
    unless Process.waitpid(@child, Process::WNOHANG)
      log! "Sending TERM signal to child #{@child}"
      Process.kill("TERM", @child)
      (term_timeout.to_f * 10).round.times do |i|
        sleep(0.1)
        return if Process.waitpid(@child, Process::WNOHANG)
      end
      log! "Sending KILL signal to child #{@child}"
      Process.kill("KILL", @child)
    else
      log! "Child #{@child} already quit."
    end
  end
rescue SystemCallError
  log! "Child #{@child} already quit and reaped."
end
pause_processing() click to toggle source

Stop processing jobs after the current one has completed (if we're currently running one).

# File lib/resque/worker.rb, line 479
def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
end
paused?() click to toggle source

are we paused?

# File lib/resque/worker.rb, line 473
def paused?
  @paused
end
perform(job) { |job| ... } click to toggle source

Processes a given job in the child.

# File lib/resque/worker.rb, line 247
def perform(job)
  begin
    run_hook :after_fork, job if will_fork?
    job.perform
  rescue Object => e
    report_failed_job(job,e)
  else
    log "done: #{job.inspect}"
  ensure
    yield job if block_given?
  end
end
pid() click to toggle source

Returns Integer PID of running worker

# File lib/resque/worker.rb, line 667
def pid
  @pid ||= Process.pid
end
process(job = nil, &block) click to toggle source

DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.

# File lib/resque/worker.rb, line 221
def process(job = nil, &block)
  return unless job ||= reserve

  job.worker = self
  working_on job
  perform(job, &block)
ensure
  done_working
end
processed() click to toggle source

How many jobs has this worker processed? Returns an int.

# File lib/resque/worker.rb, line 584
def processed
  Stat["processed:#{self}"]
end
processed!() click to toggle source

Tell Redis we've processed a job.

# File lib/resque/worker.rb, line 589
def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end
processing()
Alias for: job
procline(string) click to toggle source

Given a string, sets the procline ($0) and logs. Procline is always in the format of:

resque-VERSION: STRING
# File lib/resque/worker.rb, line 715
def procline(string)
  $0 = "resque-#{Resque::Version}: #{string}"
  log! $0
end
prune_dead_workers() click to toggle source

Looks for any workers which should be running on this server and, if they're not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.

# File lib/resque/worker.rb, line 500
def prune_dead_workers
  all_workers = Worker.all
  known_workers = worker_pids unless all_workers.empty?
  all_workers.each do |worker|
    host, pid, worker_queues_raw = worker.id.split(':')
    worker_queues = worker_queues_raw.split(",")
    unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
      # If the worker we are trying to prune does not belong to the queues
      # we are listening to, we should not touch it.
      # Attempt to prune a worker from different queues may easily result in
      # an unknown class exception, since that worker could easily be even
      # written in different language.
      next
    end
    next unless host == hostname
    next if known_workers.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end
queues() click to toggle source

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.

# File lib/resque/worker.rb, line 299
def queues
  @queues.map do |queue|
    queue.strip!
    if (matched_queues = glob_match(queue)).empty?
      queue
    else
      matched_queues
    end
  end.flatten.uniq
end
reconnect() click to toggle source

Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.

# File lib/resque/worker.rb, line 280
def reconnect
  tries = 0
  begin
    redis.client.reconnect
  rescue Redis::BaseConnectionError
    if (tries += 1) <= 3
      log "Error reconnecting to Redis; retrying"
      sleep(tries)
      retry
    else
      log "Error reconnecting to Redis; quitting"
      raise
    end
  end
end
redis() click to toggle source
# File lib/resque/worker.rb, line 15
def redis
  Resque.redis
end
register_signal_handlers() click to toggle source

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don't process any new jobs CONT: Start processing jobs again after a USR2

# File lib/resque/worker.rb, line 368
def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }

  begin
    trap('QUIT') { shutdown   }
    if term_child
      trap('USR1') { new_kill_child }
    else
      trap('USR1') { kill_child }
    end
    trap('USR2') { pause_processing }
    trap('CONT') { unpause_processing }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log! "Registered signals"
end
register_worker() click to toggle source

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.

# File lib/resque/worker.rb, line 523
def register_worker
  redis.pipelined do
    redis.sadd(:workers, self)
    started!
  end
end
report_failed_job(job,exception) click to toggle source

Reports the exception and marks the job as failed

# File lib/resque/worker.rb, line 232
def report_failed_job(job,exception)
  log "#{job.inspect} failed: #{exception.inspect}"
  begin
    job.fail(exception)
  rescue Object => exception
    log "Received exception when reporting failure: #{exception.inspect}"
  end
  begin
    failed!
  rescue Object => exception
    log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
  end
end
reserve() click to toggle source

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.

# File lib/resque/worker.rb, line 262
def reserve
  queues.each do |queue|
    log! "Checking #{queue}"
    if job = Resque.reserve(queue)
      log! "Found job on #{queue}"
      return job
    end
  end

  nil
rescue Exception => e
  log "Error reserving job: #{e.inspect}"
  log e.backtrace.join("\n")
  raise e
end
run_hook(name, *args) click to toggle source

Runs a named hook, passing along any arguments.

# File lib/resque/worker.rb, line 531
def run_hook(name, *args)
  return unless hooks = Resque.send(name)
  msg = "Running #{name} hooks"
  msg << " with #{args.inspect}" if args.any?
  log msg

  hooks.each do |hook|
    args.any? ? hook.call(*args) : hook.call
  end
end
shutdown() click to toggle source

Schedule this worker for shutdown. Will finish processing the current job.

# File lib/resque/worker.rb, line 407
def shutdown
  log 'Exiting...'
  @shutdown = true
end
shutdown!() click to toggle source

Kill the child and shutdown immediately. If not forking, abort this process.

# File lib/resque/worker.rb, line 414
def shutdown!
  shutdown
  if term_child
    if fork_per_job?
      new_kill_child
    else
      # Raise TermException in the same process
      trap('TERM') do
        # ignore subsequent terms
      end
      raise TermException.new("SIGTERM")
    end
  else
    kill_child
  end
end
shutdown?() click to toggle source

Should this worker shutdown as soon as current job is finished?

# File lib/resque/worker.rb, line 432
def shutdown?
  @shutdown
end
solaris_worker_pids() click to toggle source

Find Resque worker pids on Solaris.

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque/worker.rb, line 702
def solaris_worker_pids
  `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
    real_pid = line.split(' ')[0]
    pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
    if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
      real_pid
    end
  end.compact
end
started() click to toggle source

What time did this worker start? Returns an instance of `Time`

# File lib/resque/worker.rb, line 606
def started
  redis.get "worker:#{self}:started"
end
started!() click to toggle source

Tell Redis we've started

# File lib/resque/worker.rb, line 611
def started!
  redis.set("worker:#{self}:started", Time.now.to_s)
end
startup() click to toggle source

Runs all the methods needed when a worker begins its lifecycle.

# File lib/resque/worker.rb, line 339
def startup
  Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING
  enable_gc_optimizations
  register_signal_handlers
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end
state() click to toggle source

Returns a symbol representing the current worker state, which can be either :working or :idle

# File lib/resque/worker.rb, line 641
def state
  redis.exists("worker:#{self}") ? :working : :idle
end
to_s() click to toggle source

The string representation is the same as the id for this worker instance. Can be used with `Worker.find`.

# File lib/resque/worker.rb, line 656
def to_s
  @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
end
Also aliased as: id
unpause_processing() click to toggle source

Start processing jobs again after a pause

# File lib/resque/worker.rb, line 485
def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
end
unregister_signal_handlers() click to toggle source
# File lib/resque/worker.rb, line 388
def unregister_signal_handlers
  trap('TERM') do
    trap ('TERM') do 
      # ignore subsequent terms
    end  
    raise TermException.new("SIGTERM") 
  end 
  trap('INT', 'DEFAULT')

  begin
    trap('QUIT', 'DEFAULT')
    trap('USR1', 'DEFAULT')
    trap('USR2', 'DEFAULT')
  rescue ArgumentError
  end
end
unregister_worker(exception = nil) click to toggle source

Unregisters ourself as a worker. Useful when shutting down.

# File lib/resque/worker.rb, line 543
def unregister_worker(exception = nil)
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    job = Job.new(hash['queue'], hash['payload'])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    job.worker = self
    job.fail(exception || DirtyExit.new)
  end

  redis.pipelined do
    redis.srem(:workers, self)
    redis.del("worker:#{self}")
    redis.del("worker:#{self}:started")

    Stat.clear("processed:#{self}")
    Stat.clear("failed:#{self}")
  end
end
validate_queues() click to toggle source

A worker must be given a queue, otherwise it won't know what to do with itself.

You probably never need to call this.

# File lib/resque/worker.rb, line 139
def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end
verbose() click to toggle source

Deprecated legacy methods for controlling the logging threshhold Use Resque.logger.level now, e.g.:

Resque.logger.level = Logger::DEBUG
# File lib/resque/worker.rb, line 735
def verbose
  logger_severity_deprecation_warning
  @verbose
end
verbose=(value) click to toggle source
# File lib/resque/worker.rb, line 745
def verbose=(value);
  logger_severity_deprecation_warning

  if value && !very_verbose
    Resque.logger.formatter = VerboseFormatter.new
  elsif !value
    Resque.logger.formatter = QuietFormatter.new
  end

  @verbose = value
end
very_verbose() click to toggle source
# File lib/resque/worker.rb, line 740
def very_verbose
  logger_severity_deprecation_warning
  @very_verbose
end
very_verbose=(value) click to toggle source
# File lib/resque/worker.rb, line 757
def very_verbose=(value)
  logger_severity_deprecation_warning
  if value
    Resque.logger.formatter = VeryVerboseFormatter.new
  elsif !value && verbose
    Resque.logger.formatter = VerboseFormatter.new
  else
    Resque.logger.formatter = QuietFormatter.new
  end

  @very_verbose = value
end
will_fork?() click to toggle source
# File lib/resque/worker.rb, line 631
def will_fork?
  !@cant_fork && !$TESTING && fork_per_job?
end
windows_worker_pids() click to toggle source

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque/worker.rb, line 685
def windows_worker_pids
  tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
  tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
end
work(interval = 5.0, &block) click to toggle source

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker's life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.

# File lib/resque/worker.rb, line 161
def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  startup

  loop do
    break if shutdown?

    if not paused? and job = reserve
      log "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
      if @child = fork(job)
        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
      else
        unregister_signal_handlers if will_fork? && term_child
        begin

          reconnect
          perform(job, &block)

        rescue Exception => exception
          report_failed_job(job,exception)
        end

        if will_fork?
          run_at_exit_hooks ? exit : exit!
        end
      end

      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
rescue Exception => exception
  unless exception.class == SystemExit && !@child && run_at_exit_hooks
    log "Failed to start worker : #{exception.inspect}"

    unregister_worker(exception)
  end
end
worker_pids() click to toggle source

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque/worker.rb, line 673
def worker_pids
  if RUBY_PLATFORM =~ /solaris/
    solaris_worker_pids
  elsif RUBY_PLATFORM =~ /mingw32/
    windows_worker_pids
  else
    linux_worker_pids
  end
end
working?() click to toggle source

Boolean - true if working, false if not

# File lib/resque/worker.rb, line 622
def working?
  state == :working
end
working_on(job) click to toggle source

Given a job, tells Redis we're working on it. Useful for seeing what workers are doing and when.

# File lib/resque/worker.rb, line 566
def working_on(job)
  data = encode \
    :queue   => job.queue,
    :run_at  => Time.now.utc.iso8601,
    :payload => job.payload
  redis.set("worker:#{self}", data)
end