class Pallets::Worker

Public Class Methods

new(manager, backend) click to toggle source
# File lib/pallets/worker.rb, line 3
def initialize(manager, backend)
  @manager = manager
  @backend = backend
  @current_job = nil
  @needs_to_stop = false
  @thread = nil
end

Public Instance Methods

debug() click to toggle source
# File lib/pallets/worker.rb, line 28
def debug
  @thread.backtrace
end
graceful_shutdown() click to toggle source
# File lib/pallets/worker.rb, line 15
def graceful_shutdown
  @needs_to_stop = true
end
hard_shutdown() click to toggle source
# File lib/pallets/worker.rb, line 19
def hard_shutdown
  return unless @thread
  @thread.raise Pallets::Shutdown
end
id() click to toggle source
# File lib/pallets/worker.rb, line 32
def id
  "W#{@thread.object_id.to_s(36)}".upcase if @thread
end
needs_to_stop?() click to toggle source
# File lib/pallets/worker.rb, line 24
def needs_to_stop?
  @needs_to_stop
end
start() click to toggle source
# File lib/pallets/worker.rb, line 11
def start
  @thread ||= Thread.new { work }
end

Private Instance Methods

backoff_in_seconds(count) click to toggle source
# File lib/pallets/worker.rb, line 123
def backoff_in_seconds(count)
  count ** 4 + rand(6..10)
end
handle_job_error(ex, job, job_hash) click to toggle source
# File lib/pallets/worker.rb, line 94
def handle_job_error(ex, job, job_hash)
  failures = job_hash.fetch('failures', 0) + 1
  new_job = serializer.dump(job_hash.merge(
    'failures' => failures,
    'given_up_at' => Time.now.to_f,
    'error_class' => ex.class.name,
    'error_message' => ex.message,
    'reason' => 'error'
  ))
  if failures < job_hash['max_failures']
    retry_at = Time.now.to_f + backoff_in_seconds(failures)
    @backend.retry(new_job, job, retry_at)
  else
    @backend.give_up(job_hash['wfid'], new_job, job)
  end
end
handle_job_return_false(job, job_hash) click to toggle source
# File lib/pallets/worker.rb, line 111
def handle_job_return_false(job, job_hash)
  new_job = serializer.dump(job_hash.merge(
    'given_up_at' => Time.now.to_f,
    'reason' => 'returned_false'
  ))
  @backend.give_up(job_hash['wfid'], new_job, job)
end
handle_job_success(context, job, job_hash) click to toggle source
# File lib/pallets/worker.rb, line 119
def handle_job_success(context, job, job_hash)
  @backend.save(job_hash['wfid'], job_hash['jid'], job, serializer.dump_context(context.buffer))
end
middleware() click to toggle source
# File lib/pallets/worker.rb, line 131
def middleware
  @middleware ||= Pallets.middleware
end
process(job) click to toggle source
# File lib/pallets/worker.rb, line 62
def process(job)
  begin
    job_hash = serializer.load(job)
  rescue
    # We ensure only valid jobs are created. If something fishy reaches this
    # point, just give up on it
    @backend.discard(job)
    Pallets.logger.error "Could not deserialize #{job}. Gave up job"
    return
  end

  context = Context[
    serializer.load_context(@backend.get_context(job_hash['wfid']))
  ]

  task_class = Pallets::Util.constantize(job_hash["task_class"])
  task = task_class.new(context)
  begin
    task_result = middleware.invoke(self, job_hash, context) do
      task.run
    end
  rescue => ex
    handle_job_error(ex, job, job_hash)
  else
    if task_result == false
      handle_job_return_false(job, job_hash)
    else
      handle_job_success(context, job, job_hash)
    end
  end
end
serializer() click to toggle source
# File lib/pallets/worker.rb, line 127
def serializer
  @serializer ||= Pallets.serializer
end
work() click to toggle source
# File lib/pallets/worker.rb, line 38
def work
  loop do
    break if needs_to_stop?

    @current_job = @backend.pick
    # No need to requeue because of the reliability queue
    break if needs_to_stop?
    next if @current_job.nil?

    process @current_job

    @current_job = nil
  end
  @manager.remove_worker(self)
rescue Pallets::Shutdown
  @manager.remove_worker(self)
rescue => ex
  Pallets.logger.error "#{ex.class.name}: #{ex.message}"
  Pallets.logger.error ex.backtrace.join("\n") unless ex.backtrace.nil?
  # Do not flood the process in case of persisting unforeseen errors
  sleep 1
  @manager.replace_worker(self)
end