class Backburner::Worker
@abstract Subclass and override {#process_tube_names}, {#prepare} and {#start} to implement
a custom Worker class.
Attributes
List of tube names to be watched and processed
List of tube names to be watched and processed
Public Class Methods
Enqueues a job to be processed later by a worker. Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
@raise [Beaneater::NotConnected] If beanstalk fails to connect. @example
Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000
# File lib/backburner/worker.rb, line 26 def self.enqueue(job_class, args=[], opts={}) opts[:shard_key] = opts[:shard_key].nil? ? "X" : opts[:shard_key].to_s pri = resolve_priority(opts[:pri] || job_class) delay = [0, opts[:delay].to_i].max ttr = resolve_respond_timeout(opts[:ttr] || job_class) res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return nil unless res # stop if hook is false data = { :class => job_class.name, :args => args, :ttr => ttr } queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue]) begin response = nil connection = Backburner::Connection.new(Backburner.configuration.allq_url) connection.retryable do tube_name = expand_tube_name(queue || job_class) serialized_data = Backburner.configuration.job_serializer_proc.call(data) send_data = { pri: pri, delay: delay, ttr: ttr } opts.merge!(send_data) response = connection.put(tube_name, serialized_data, opts) end return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) ensure connection.close if connection end response end
# File lib/backburner/worker.rb, line 16 def known_queue_classes; @known_queue_classes ||= []; end
Constructs a new worker for processing jobs within specified tubes.
@example
Worker.new(['test.job'])
# File lib/backburner/worker.rb, line 80 def initialize(tube_names=nil) @connection = new_connection @tube_names = self.process_tube_names(tube_names) register_signal_handlers! end
Starts processing jobs with the specified tube_names.
@example
Backburner::Worker.start(["foo.tube.name"])
# File lib/backburner/worker.rb, line 65 def self.start(tube_names=nil) begin self.new(tube_names).start rescue SystemExit # do nothing end end
Public Instance Methods
Used to prepare the job queues before job processing is initiated.
@raise [Beaneater::NotConnected] If beanstalk fails to connect. @example
@worker.prepare
@abstract Define this in your worker subclass to be run once before processing. Recommended to watch tubes or print a message to the logs with 'log_info'
# File lib/backburner/worker.rb, line 106 def prepare raise NotImplementedError end
Processes tube_names
given tube_names
array. Should return normalized tube_names
as an array of strings.
@example
process_tube_names([['foo'], ['bar']]) => ['foo', 'bar', 'baz']
@note This method can be overridden in inherited workers to add more complex tube name processing.
# File lib/backburner/worker.rb, line 127 def process_tube_names(tube_names) compact_tube_names(tube_names) end
Triggers this worker to shutdown
# File lib/backburner/worker.rb, line 111 def shutdown Thread.new do log_info 'Worker exiting...' end Kernel.exit end
Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.
@example
@worker.start
# File lib/backburner/worker.rb, line 92 def start raise NotImplementedError end
Performs a job by reserving a job from beanstalk and processing it
@example
@worker.work_one_job
@raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
# File lib/backburner/worker.rb, line 136 def work_one_job(conn = connection, tube_name = nil) if tube_name.nil? self.log_error "Sampling tube, this is bad practice for Allq" tube_name = @tube_names.sample end begin job = reserve_job(conn, tube_name) rescue Exception => e self.log_error "Sleeping" self.log_error "Exception: #{e.full_message}" sleep(rand*3) return end if job && job.body begin self.log_job_begin(job.name, job.args) job.process self.log_job_end(job.name) rescue Backburner::Job::JobFormatInvalid => e self.log_error self.exception_message(e) rescue => e # Error occurred processing job self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob) unless job self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!" return end # NB: There's a slight chance here that the connection to allq has # gone down between the time we reserved / processed the job and here. num_retries = job.releases max_job_retries = resolve_max_job_retries(job.job_class) retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" retry_delay = resolve_retry_delay(job.job_class) delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay if num_retries + 1 > max_job_retries job.bury else job.release(delay) end self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at handle_error(e, job.name, job.args, job) end else sleep(rand*3) end job end
Protected Instance Methods
Returns a list of all tubes known within the system Filtered for tubes that match the known prefix
# File lib/backburner/worker.rb, line 206 def all_existing_queues known_queues = Backburner::Worker.known_queue_classes.map(&:queue) existing_tubes = self.connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ } existing_tubes + known_queues + [queue_config.primary_queue] end
Normalizes tube names given array of tube_names
Compacts nil items, flattens arrays, sets tubes to nil if no valid names Loads default tubes when no tubes given.
# File lib/backburner/worker.rb, line 230 def compact_tube_names(tube_names) tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array) tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0 tube_names = nil if tube_names && tube_names.compact.empty? tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues Array(tube_names).uniq end
Handles an error according to custom definition Used when processing a job that errors out
# File lib/backburner/worker.rb, line 215 def handle_error(e, name, args, job) if error_handler = Backburner.configuration.on_error if error_handler.arity == 1 error_handler.call(e) elsif error_handler.arity == 3 error_handler.call(e, name, args) else error_handler.call(e, name, args, job) end end end
Return a new connection instance
# File lib/backburner/worker.rb, line 193 def new_connection Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) } end
Registers signal handlers TERM and INT to trigger
# File lib/backburner/worker.rb, line 239 def register_signal_handlers! trap('TERM') { shutdown } trap('INT') { shutdown } end
Reserve a job from the watched queues
# File lib/backburner/worker.rb, line 198 def reserve_job(conn, tube_name, reserve_timeout = Backburner.configuration.reserve_timeout) job = conn.get(tube_name) return nil if job.nil? || job.body == nil? Backburner::Job.new(job) end