class Pallets::Worker
Public Class Methods
new(manager, backend)
click to toggle source
# File lib/pallets/worker.rb, line 3 def initialize(manager, backend) @manager = manager @backend = backend @current_job = nil @needs_to_stop = false @thread = nil end
Public Instance Methods
debug()
click to toggle source
# File lib/pallets/worker.rb, line 28 def debug @thread.backtrace end
graceful_shutdown()
click to toggle source
# File lib/pallets/worker.rb, line 15 def graceful_shutdown @needs_to_stop = true end
hard_shutdown()
click to toggle source
# File lib/pallets/worker.rb, line 19 def hard_shutdown return unless @thread @thread.raise Pallets::Shutdown end
id()
click to toggle source
# File lib/pallets/worker.rb, line 32 def id "W#{@thread.object_id.to_s(36)}".upcase if @thread end
needs_to_stop?()
click to toggle source
# File lib/pallets/worker.rb, line 24 def needs_to_stop? @needs_to_stop end
start()
click to toggle source
# File lib/pallets/worker.rb, line 11 def start @thread ||= Thread.new { work } end
Private Instance Methods
backoff_in_seconds(count)
click to toggle source
# File lib/pallets/worker.rb, line 123 def backoff_in_seconds(count) count ** 4 + rand(6..10) end
handle_job_error(ex, job, job_hash)
click to toggle source
# File lib/pallets/worker.rb, line 94 def handle_job_error(ex, job, job_hash) failures = job_hash.fetch('failures', 0) + 1 new_job = serializer.dump(job_hash.merge( 'failures' => failures, 'given_up_at' => Time.now.to_f, 'error_class' => ex.class.name, 'error_message' => ex.message, 'reason' => 'error' )) if failures < job_hash['max_failures'] retry_at = Time.now.to_f + backoff_in_seconds(failures) @backend.retry(new_job, job, retry_at) else @backend.give_up(job_hash['wfid'], new_job, job) end end
handle_job_return_false(job, job_hash)
click to toggle source
# File lib/pallets/worker.rb, line 111 def handle_job_return_false(job, job_hash) new_job = serializer.dump(job_hash.merge( 'given_up_at' => Time.now.to_f, 'reason' => 'returned_false' )) @backend.give_up(job_hash['wfid'], new_job, job) end
handle_job_success(context, job, job_hash)
click to toggle source
# File lib/pallets/worker.rb, line 119 def handle_job_success(context, job, job_hash) @backend.save(job_hash['wfid'], job_hash['jid'], job, serializer.dump_context(context.buffer)) end
middleware()
click to toggle source
# File lib/pallets/worker.rb, line 131 def middleware @middleware ||= Pallets.middleware end
process(job)
click to toggle source
# File lib/pallets/worker.rb, line 62 def process(job) begin job_hash = serializer.load(job) rescue # We ensure only valid jobs are created. If something fishy reaches this # point, just give up on it @backend.discard(job) Pallets.logger.error "Could not deserialize #{job}. Gave up job" return end context = Context[ serializer.load_context(@backend.get_context(job_hash['wfid'])) ] task_class = Pallets::Util.constantize(job_hash["task_class"]) task = task_class.new(context) begin task_result = middleware.invoke(self, job_hash, context) do task.run end rescue => ex handle_job_error(ex, job, job_hash) else if task_result == false handle_job_return_false(job, job_hash) else handle_job_success(context, job, job_hash) end end end
serializer()
click to toggle source
# File lib/pallets/worker.rb, line 127 def serializer @serializer ||= Pallets.serializer end
work()
click to toggle source
# File lib/pallets/worker.rb, line 38 def work loop do break if needs_to_stop? @current_job = @backend.pick # No need to requeue because of the reliability queue break if needs_to_stop? next if @current_job.nil? process @current_job @current_job = nil end @manager.remove_worker(self) rescue Pallets::Shutdown @manager.remove_worker(self) rescue => ex Pallets.logger.error "#{ex.class.name}: #{ex.message}" Pallets.logger.error ex.backtrace.join("\n") unless ex.backtrace.nil? # Do not flood the process in case of persisting unforeseen errors sleep 1 @manager.replace_worker(self) end