class Qyu::Workers::Base
Qyu::Workers::Base
A Worker is sitting on a queue, waiting for something.
Qyu::Worker#work(queue_name)
Worker lifecycle:
- Start an infinte loop: while (true) - Fetch a message (Task) from its queue: t = Task.fetch(queue_name) - Check the completion: if t.completed? t.acknowledge_message - Lock it: t.lock! && t.mark_working - Works: yield(t) - Create the next steps/tasks: t.job.create_next_tasks(t, t.job.payload (...)) - Finish: t.unlock! && t.mark_finished && t.acknowledge_message
Attributes
id[R]
processed_tasks[RW]
Public Class Methods
new(&block)
click to toggle source
# File lib/qyu/workers/base.rb, line 32 def initialize(&block) @id = Qyu::Utils.uuid @processed_tasks = 0 instance_exec(&block) if block_given? end
Public Instance Methods
work(queue_name, blocking: true) { |fetched_task| ... }
click to toggle source
# File lib/qyu/workers/base.rb, line 38 def work(queue_name, blocking: true) log(:info, "worker started for queue '#{queue_name}'") repeat = true remaining_fetch_retries = 3 while repeat run_callbacks(:execute) do begin fetched_task = fetch_task(queue_name) validate_payload!(fetched_task) log(:info, "worker processed #{processed_tasks} tasks from queue `#{queue_name}`") if fetched_task.acknowledgeable? discard_completed_task(fetched_task) elsif fetched_task.lock! fetched_task.mark_working begin Timeout::timeout(@timeout) do yield(fetched_task) end conclude_task(fetched_task) rescue => ex fail_task(fetched_task, ex) end end rescue Qyu::Errors::UnsyncError rescue Qyu::Errors::CouldNotFetchTask => ex if remaining_fetch_retries <= 0 acknowledge_message_with_task_id_not_found_in_store(ex) else sleep(remaining_fetch_retries) remaining_fetch_retries -= 1 retry end rescue Qyu::Errors::PayloadValidationError => ex log("invalid payload: #{ex.class}: #{ex.message}") fetched_task.mark_invalid_payload rescue => ex log("worker error: #{ex.class}: #{ex.message}") log("backtrace: #{ex.backtrace.join("\n")}") end end repeat = blocking run_garbage_collector end end
Private Instance Methods
acknowledge_message_with_task_id_not_found_in_store(exception)
click to toggle source
# File lib/qyu/workers/base.rb, line 124 def acknowledge_message_with_task_id_not_found_in_store(exception) # If a task is not found in the Store then there is no point attempting # to fetch the message over and over again. log("worker error: #{exception.class}: #{exception.message}") log("backtrace: #{exception.backtrace.join("\n")}") log("original error: #{exception.original_error.class}: #{exception.original_error.message}") log("backtrace: #{exception.original_error.backtrace.join("\n")}") if exception.original_error.class == Qyu::Errors::TaskNotFound && exception.queue_name && exception.message_id Qyu::Task.acknowledge_message(exception.queue_name, exception.message_id) end end
conclude_task(fetched_task)
click to toggle source
# File lib/qyu/workers/base.rb, line 99 def conclude_task(fetched_task) Qyu.store.transaction do log(:debug, 'task finished and creating next tasks.') fetched_task.job.create_next_tasks( fetched_task, fetched_task.job.payload.merge(fetched_task.payload) ) fetched_task.unlock! fetched_task.mark_completed end fetched_task.acknowledge_message end
discard_completed_task(fetched_task)
click to toggle source
# File lib/qyu/workers/base.rb, line 94 def discard_completed_task(fetched_task) log(:debug, 'fetched completed task and discarding it...') fetched_task.acknowledge_message end
fail_task(fetched_task, exception)
click to toggle source
# File lib/qyu/workers/base.rb, line 112 def fail_task(fetched_task, exception) unless exception.class == Qyu::Errors::UnsyncError log("worker error: #{exception.class}: #{exception.message}") log("backtrace: #{exception.backtrace.join("\n")}") end Qyu.store.transaction do fetched_task.enqueue_in_failure_queue if @failure_queue fetched_task.unlock! fetched_task.mark_queued end end
fetch_task(queue_name)
click to toggle source
# File lib/qyu/workers/base.rb, line 88 def fetch_task(queue_name) fetched_task = Qyu::Task.fetch(queue_name) @processed_tasks += 1 fetched_task end
log(level = :error, message)
click to toggle source
# File lib/qyu/workers/base.rb, line 138 def log(level = :error, message) Qyu.logger.public_send(level, "[#{id}] #{message}") end
run_garbage_collector()
click to toggle source
# File lib/qyu/workers/base.rb, line 142 def run_garbage_collector log(:debug, 'running garbage collector') GC.start end