class Qyu::Workers::Base

Qyu::Workers::Base A Worker is sitting on a queue, waiting for something.

Qyu::Worker#work(queue_name)

Worker lifecycle:

- Start an infinte loop:
      while (true)
- Fetch a message (Task) from its queue:
      t = Task.fetch(queue_name)
- Check the completion:
      if t.completed? t.acknowledge_message
- Lock it:
      t.lock! && t.mark_working
- Works: yield(t)
- Create the next steps/tasks:
      t.job.create_next_tasks(t, t.job.payload (...))
- Finish:
      t.unlock! && t.mark_finished && t.acknowledge_message

Attributes

id[R]
processed_tasks[RW]

Public Class Methods

new(&block) click to toggle source
# File lib/qyu/workers/base.rb, line 32
def initialize(&block)
  @id = Qyu::Utils.uuid
  @processed_tasks = 0
  instance_exec(&block) if block_given?
end

Public Instance Methods

work(queue_name, blocking: true) { |fetched_task| ... } click to toggle source
# File lib/qyu/workers/base.rb, line 38
def work(queue_name, blocking: true)
  log(:info, "worker started for queue '#{queue_name}'")
  repeat = true

  remaining_fetch_retries = 3

  while repeat
    run_callbacks(:execute) do
      begin
        fetched_task = fetch_task(queue_name)
        validate_payload!(fetched_task)
        log(:info, "worker processed #{processed_tasks} tasks from queue `#{queue_name}`")
        if fetched_task.acknowledgeable?
          discard_completed_task(fetched_task)
        elsif fetched_task.lock!
          fetched_task.mark_working
          begin
            Timeout::timeout(@timeout) do
              yield(fetched_task)
            end
            conclude_task(fetched_task)
          rescue => ex
            fail_task(fetched_task, ex)
          end
        end
      rescue Qyu::Errors::UnsyncError
      rescue Qyu::Errors::CouldNotFetchTask => ex
        if remaining_fetch_retries <= 0
          acknowledge_message_with_task_id_not_found_in_store(ex)
        else
          sleep(remaining_fetch_retries)
          remaining_fetch_retries -= 1
          retry
        end
      rescue Qyu::Errors::PayloadValidationError => ex
        log("invalid payload: #{ex.class}: #{ex.message}")
        fetched_task.mark_invalid_payload
      rescue => ex
        log("worker error: #{ex.class}: #{ex.message}")
        log("backtrace: #{ex.backtrace.join("\n")}")
      end
    end

    repeat = blocking
    run_garbage_collector
  end
end

Private Instance Methods

acknowledge_message_with_task_id_not_found_in_store(exception) click to toggle source
# File lib/qyu/workers/base.rb, line 124
def acknowledge_message_with_task_id_not_found_in_store(exception)
  # If a task is not found in the Store then there is no point attempting
  # to fetch the message over and over again.
  log("worker error: #{exception.class}: #{exception.message}")
  log("backtrace: #{exception.backtrace.join("\n")}")
  log("original error: #{exception.original_error.class}: #{exception.original_error.message}")
  log("backtrace: #{exception.original_error.backtrace.join("\n")}")
  if exception.original_error.class == Qyu::Errors::TaskNotFound &&
     exception.queue_name &&
     exception.message_id
    Qyu::Task.acknowledge_message(exception.queue_name, exception.message_id)
  end
end
conclude_task(fetched_task) click to toggle source
# File lib/qyu/workers/base.rb, line 99
def conclude_task(fetched_task)
  Qyu.store.transaction do
    log(:debug, 'task finished and creating next tasks.')
    fetched_task.job.create_next_tasks(
      fetched_task,
      fetched_task.job.payload.merge(fetched_task.payload)
    )
    fetched_task.unlock!
    fetched_task.mark_completed
  end
  fetched_task.acknowledge_message
end
discard_completed_task(fetched_task) click to toggle source
# File lib/qyu/workers/base.rb, line 94
def discard_completed_task(fetched_task)
  log(:debug, 'fetched completed task and discarding it...')
  fetched_task.acknowledge_message
end
fail_task(fetched_task, exception) click to toggle source
# File lib/qyu/workers/base.rb, line 112
def fail_task(fetched_task, exception)
  unless exception.class == Qyu::Errors::UnsyncError
    log("worker error: #{exception.class}: #{exception.message}")
    log("backtrace: #{exception.backtrace.join("\n")}")
  end
  Qyu.store.transaction do
    fetched_task.enqueue_in_failure_queue if @failure_queue
    fetched_task.unlock!
    fetched_task.mark_queued
  end
end
fetch_task(queue_name) click to toggle source
# File lib/qyu/workers/base.rb, line 88
def fetch_task(queue_name)
  fetched_task = Qyu::Task.fetch(queue_name)
  @processed_tasks += 1
  fetched_task
end
log(level = :error, message) click to toggle source
# File lib/qyu/workers/base.rb, line 138
def log(level = :error, message)
  Qyu.logger.public_send(level, "[#{id}] #{message}")
end
run_garbage_collector() click to toggle source
# File lib/qyu/workers/base.rb, line 142
def run_garbage_collector
  log(:debug, 'running garbage collector')
  GC.start
end