class Qyu::Task
Qyu::Task
A Task
represents a unit of work in a workflow. Conceptually a Task:
-
may not exist outside the context of a queue.
-
it is created ON the queue
-
it remains on the queue until it was successfully processed (or failed “enough” times)
Constants
- LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL
- POLL_INTERVAL
Attributes
Public Class Methods
# File lib/qyu/models/task.rb, line 145 def self.acknowledge_message(queue_name, message_id) Qyu.logger.debug "Acknowledging message with ID=#{message_id} from queue `#{queue_name}`" Qyu.queue.acknowledge_message(queue_name, message_id) end
@returns Task
by defintion Task.create
does 2 things:
We have to make sure that a Task
is unique in the Store
. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job
failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven't even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.
# File lib/qyu/models/task.rb, line 31 def self.create(queue_name: nil, attributes: nil) fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}" task_id = Qyu.store.find_or_persist_task( attributes['name'], queue_name, attributes['payload'], attributes['job_id'], attributes['parent_task_id'] ) do |t_id| Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}" Qyu.queue.enqueue_task(queue_name, t_id) end new(task_id, attributes, queue_name) end
# File lib/qyu/models/task.rb, line 169 def self.enqueue_in_failure_queue(queue_name, id, message_id) Qyu.logger.debug "Enqueuing failed message with ID=#{message_id} in #{queue_name} failures queue" Qyu.queue.enqueue_task_to_failed_queue(queue_name, id) end
@returns Task
# File lib/qyu/models/task.rb, line 50 def self.fetch(queue_name) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) begin message = Qyu.queue.fetch_next_message(queue_name) task_id = message['task_id'] task_attrs = Qyu.store.find_task(task_id) rescue => ex message ||= {} raise Qyu::Errors::CouldNotFetchTask.new(queue_name, message['id'], message['task_id'], ex) end new(task_id, task_attrs, queue_name, message['id']) end
# File lib/qyu/models/task.rb, line 208 def initialize(id, attributes, queue_name, message_id = nil) # puts "task initialized attrs: #{attributes}" @status = Status.new(id) @id = id @job_id = attributes['job_id'] @parent_task_id = attributes['parent_task_id'] @payload = attributes['payload'] @queue_name = queue_name @message_id = message_id @name = attributes['name'] @created_at = attributes['created_at'] @updated_at = attributes['updated_at'] @locked_until = nil @lease_thread = nil @lease_token = nil end
# File lib/qyu/models/task.rb, line 157 def self.requeue(queue_name, id, message_id) # TODO For FIFO queues (future use) Qyu.logger.debug "Re-enqueuing message with ID=#{message_id} in queue `#{queue_name}`" Qyu.queue.enqueue_task(queue_name, id) end
# File lib/qyu/models/task.rb, line 63 def self.select(job_id:) Qyu.store.select_tasks_by_job_id(job_id).map do |task| new(task['id'], task, task['queue_name']) end end
# File lib/qyu/models/task.rb, line 69 def self.valid_attributes?(_attributes) true end
# File lib/qyu/models/task.rb, line 73 def self.valid_queue_name?(queue_name) !queue_name.nil? && queue_name != '' end
Public Instance Methods
# File lib/qyu/models/task.rb, line 202 def [](attribute) public_send(attribute) end
# File lib/qyu/models/task.rb, line 140 def acknowledge_message fail Qyu::Errors::MessageNotReceived if message_id.nil? self.class.acknowledge_message(queue_name, message_id) end
# File lib/qyu/models/task.rb, line 77 def acknowledgeable? @status.completed? || @status.invalid_payload? end
# File lib/qyu/models/task.rb, line 81 def completed? @status.completed? end
Returns task descriptor
@return [Hash] task descriptor
# File lib/qyu/models/task.rb, line 198 def descriptor workflow_descriptor['tasks'][name] end
# File lib/qyu/models/task.rb, line 163 def enqueue_in_failure_queue fail Qyu::Errors::MessageNotReceived if message_id.nil? self.class.acknowledge_message(queue_name, message_id) self.class.enqueue_in_failure_queue(queue_name, id, message_id) end
Returns parent job
@return [Qyu::Job] parent job
# File lib/qyu/models/task.rb, line 191 def job @job ||= Qyu::Job.find(job_id) end
# File lib/qyu/models/task.rb, line 89 def lock! fail Qyu::Errors::LockAlreadyAcquired if locked? Qyu.logger.debug "Task with ID=#{id} lock!" @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period]) Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}" return false if @lease_token.nil? schedule_renewal true end
# File lib/qyu/models/task.rb, line 85 def locked? !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until end
# File lib/qyu/models/task.rb, line 125 def mark_completed Qyu.store.update_status(id, Status::COMPLETED) Qyu.logger.info "Task with ID=#{id} marked completed." end
# File lib/qyu/models/task.rb, line 130 def mark_failed Qyu.store.update_status(id, Status::FAILED) Qyu.logger.debug "Task with ID=#{id} marked failed." end
# File lib/qyu/models/task.rb, line 135 def mark_invalid_payload Qyu.store.update_status(id, Status::INVALID_PAYLOAD) Qyu.logger.debug "Task with ID=#{id} has invalid payload." end
# File lib/qyu/models/task.rb, line 115 def mark_queued Qyu.store.update_status(id, Status::QUEUED) Qyu.logger.debug "Task with ID=#{id} marked queued." end
# File lib/qyu/models/task.rb, line 120 def mark_working Qyu.store.update_status(id, Status::WORKING) Qyu.logger.debug "Task with ID=#{id} marked working." end
# File lib/qyu/models/task.rb, line 150 def requeue # TODO For FIFO queues (future use) fail Qyu::Errors::MessageNotReceived if message_id.nil? self.class.acknowledge_message(queue_name, message_id) self.class.requeue(queue_name, id, message_id) end
# File lib/qyu/models/task.rb, line 101 def unlock! fail Qyu::Errors::LockNotAcquired unless locked? Qyu.logger.debug "Task with ID=#{id} unlocking!" @lease_thread&.kill success = Qyu.store.unlock_task!(id, @lease_token) if success @lease_token = nil @locked_until = nil end success end
Returns workflow specified in parent job
@return [Qyu::Workflow] full workflow
# File lib/qyu/models/task.rb, line 177 def workflow job.workflow end
Returns workflow descriptor from parent job
@return [Hash] workflow descriptor
# File lib/qyu/models/task.rb, line 184 def workflow_descriptor job.descriptor end
Private Instance Methods
# File lib/qyu/models/task.rb, line 226 def schedule_renewal Qyu.logger.debug 'scheduling renewal' renewal_moment = Qyu::Utils.seconds_after_time(-1 * LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL * Qyu.config.store[:lease_period], @locked_until) Qyu.logger.debug "renewal moment: #{renewal_moment}" @lease_thread = Thread.new do Qyu.logger.debug 'lease thread entered' while Time.now < renewal_moment sleep(POLL_INTERVAL) Qyu.logger.debug 'lease thread sleep' end Qyu.logger.debug 'lease thread time has come' @locked_until = Qyu.store.renew_lock_lease(id, Qyu.config.store[:lease_period], @lease_token) Qyu.logger.debug "lease thread locked until = #{@locked_until}" schedule_renewal end end