class Backburner::Worker

@abstract Subclass and override {#process_tube_names}, {#prepare} and {#start} to implement

a custom Worker class.

Attributes

known_queue_classes[W]
connection[RW]

List of tube names to be watched and processed

tube_names[RW]

List of tube names to be watched and processed

Public Class Methods

enqueue(job_class, args=[], opts={}) click to toggle source

Enqueues a job to be processed later by a worker. Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)

@raise [Beaneater::NotConnected] If beanstalk fails to connect. @example

Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000
# File lib/backburner/worker.rb, line 26
def self.enqueue(job_class, args=[], opts={})
  opts[:shard_key] = opts[:shard_key].nil? ? "X" : opts[:shard_key].to_s
  pri   = resolve_priority(opts[:pri] || job_class)
  delay = [0, opts[:delay].to_i].max
  ttr   = resolve_respond_timeout(opts[:ttr] || job_class)
  res   = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)

  return nil unless res # stop if hook is false

  data = { :class => job_class.name, :args => args, :ttr => ttr }
  queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue])

  begin
    response = nil
    connection = Backburner::Connection.new(Backburner.configuration.allq_url)
    connection.retryable do
      tube_name = expand_tube_name(queue || job_class)
      serialized_data = Backburner.configuration.job_serializer_proc.call(data)
      send_data = {
        pri: pri,
        delay: delay,
        ttr: ttr
      }
      opts.merge!(send_data)
      response = connection.put(tube_name, serialized_data, opts)
    end
    return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
  ensure
    connection.close if connection
  end

  response
end
known_queue_classes() click to toggle source
# File lib/backburner/worker.rb, line 16
def known_queue_classes; @known_queue_classes ||= []; end
new(tube_names=nil) click to toggle source

Constructs a new worker for processing jobs within specified tubes.

@example

Worker.new(['test.job'])
# File lib/backburner/worker.rb, line 80
def initialize(tube_names=nil)
  @connection = new_connection
  @tube_names = self.process_tube_names(tube_names)
  register_signal_handlers!
end
start(tube_names=nil) click to toggle source

Starts processing jobs with the specified tube_names.

@example

Backburner::Worker.start(["foo.tube.name"])
# File lib/backburner/worker.rb, line 65
def self.start(tube_names=nil)
  begin
    self.new(tube_names).start
  rescue SystemExit
    # do nothing
  end
end

Public Instance Methods

prepare() click to toggle source

Used to prepare the job queues before job processing is initiated.

@raise [Beaneater::NotConnected] If beanstalk fails to connect. @example

@worker.prepare

@abstract Define this in your worker subclass to be run once before processing. Recommended to watch tubes or print a message to the logs with 'log_info'

# File lib/backburner/worker.rb, line 106
def prepare
  raise NotImplementedError
end
process_tube_names(tube_names) click to toggle source

Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.

@example

process_tube_names([['foo'], ['bar']])
=> ['foo', 'bar', 'baz']

@note This method can be overridden in inherited workers to add more complex tube name processing.

# File lib/backburner/worker.rb, line 127
def process_tube_names(tube_names)
  compact_tube_names(tube_names)
end
shutdown() click to toggle source

Triggers this worker to shutdown

# File lib/backburner/worker.rb, line 111
def shutdown
  Thread.new do
    log_info 'Worker exiting...'
  end
  Kernel.exit
end
start() click to toggle source

Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.

@example

@worker.start
# File lib/backburner/worker.rb, line 92
def start
  raise NotImplementedError
end
work_one_job(conn = connection, tube_name = nil) click to toggle source

Performs a job by reserving a job from beanstalk and processing it

@example

@worker.work_one_job

@raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.

# File lib/backburner/worker.rb, line 136
def work_one_job(conn = connection, tube_name = nil)
  if tube_name.nil?
    self.log_error "Sampling tube, this is bad practice for Allq"
    tube_name = @tube_names.sample 
  end
  
  begin
    job = reserve_job(conn, tube_name)
  rescue Exception => e
    self.log_error "Sleeping"
    self.log_error "Exception: #{e.full_message}"
    sleep(rand*3)
    return
  end

  if job && job.body
    begin
      self.log_job_begin(job.name, job.args)
      job.process
      self.log_job_end(job.name)
    rescue Backburner::Job::JobFormatInvalid => e
      self.log_error self.exception_message(e)
    rescue => e # Error occurred processing job
      self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)

      unless job
        self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!"
        return
      end

      # NB: There's a slight chance here that the connection to allq has
      # gone down between the time we reserved / processed the job and here.
      num_retries = job.releases
      max_job_retries = resolve_max_job_retries(job.job_class)
      retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"
      retry_delay = resolve_retry_delay(job.job_class)
      delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
      
      if num_retries + 1 > max_job_retries
        job.bury
      else
        job.release(delay)
      end
      self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at

      handle_error(e, job.name, job.args, job)
    end
  else
    sleep(rand*3)
  end
  job
end

Protected Instance Methods

all_existing_queues() click to toggle source

Returns a list of all tubes known within the system Filtered for tubes that match the known prefix

# File lib/backburner/worker.rb, line 206
def all_existing_queues
  known_queues    = Backburner::Worker.known_queue_classes.map(&:queue)
  existing_tubes  = self.connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ }
  existing_tubes + known_queues + [queue_config.primary_queue]
end
compact_tube_names(tube_names) click to toggle source

Normalizes tube names given array of tube_names Compacts nil items, flattens arrays, sets tubes to nil if no valid names Loads default tubes when no tubes given.

# File lib/backburner/worker.rb, line 230
def compact_tube_names(tube_names)
  tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array)
  tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0
  tube_names = nil if tube_names && tube_names.compact.empty?
  tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
  Array(tube_names).uniq
end
handle_error(e, name, args, job) click to toggle source

Handles an error according to custom definition Used when processing a job that errors out

# File lib/backburner/worker.rb, line 215
def handle_error(e, name, args, job)
  if error_handler = Backburner.configuration.on_error
    if error_handler.arity == 1
      error_handler.call(e)
    elsif error_handler.arity == 3
      error_handler.call(e, name, args)
    else
      error_handler.call(e, name, args, job)
    end
  end
end
new_connection() click to toggle source

Return a new connection instance

# File lib/backburner/worker.rb, line 193
def new_connection
  Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) }
end
register_signal_handlers!() click to toggle source

Registers signal handlers TERM and INT to trigger

# File lib/backburner/worker.rb, line 239
def register_signal_handlers!
  trap('TERM') { shutdown  }
  trap('INT')  { shutdown  }
end
reserve_job(conn, tube_name, reserve_timeout = Backburner.configuration.reserve_timeout) click to toggle source

Reserve a job from the watched queues

# File lib/backburner/worker.rb, line 198
def reserve_job(conn, tube_name, reserve_timeout = Backburner.configuration.reserve_timeout)
  job = conn.get(tube_name)
  return nil if job.nil? || job.body == nil?
  Backburner::Job.new(job)
end