class Qyu::Task

Qyu::Task A Task represents a unit of work in a workflow. Conceptually a Task:

Constants

LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL
POLL_INTERVAL

Attributes

created_at[R]
id[R]
job_id[R]
message_id[R]
name[R]
parent_task_id[R]
payload[R]
queue_name[R]
status[R]
updated_at[R]

Public Class Methods

acknowledge_message(queue_name, message_id) click to toggle source
# File lib/qyu/models/task.rb, line 145
def self.acknowledge_message(queue_name, message_id)
  Qyu.logger.debug "Acknowledging message with ID=#{message_id} from queue `#{queue_name}`"
  Qyu.queue.acknowledge_message(queue_name, message_id)
end
create(queue_name: nil, attributes: nil) click to toggle source

@returns Task by defintion Task.create does 2 things:

We have to make sure that a Task is unique in the Store. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven't even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.

# File lib/qyu/models/task.rb, line 31
def self.create(queue_name: nil, attributes: nil)
  fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}"
  task_id = Qyu.store.find_or_persist_task(
    attributes['name'],
    queue_name,
    attributes['payload'],
    attributes['job_id'],
    attributes['parent_task_id']
  ) do |t_id|
    Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}"
    Qyu.queue.enqueue_task(queue_name, t_id)
  end

  new(task_id, attributes, queue_name)
end
enqueue_in_failure_queue(queue_name, id, message_id) click to toggle source
# File lib/qyu/models/task.rb, line 169
def self.enqueue_in_failure_queue(queue_name, id, message_id)
  Qyu.logger.debug "Enqueuing failed message with ID=#{message_id} in #{queue_name} failures queue"
  Qyu.queue.enqueue_task_to_failed_queue(queue_name, id)
end
fetch(queue_name) click to toggle source

@returns Task

# File lib/qyu/models/task.rb, line 50
def self.fetch(queue_name)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  begin
    message    = Qyu.queue.fetch_next_message(queue_name)
    task_id    = message['task_id']
    task_attrs = Qyu.store.find_task(task_id)
  rescue => ex
    message ||= {}
    raise Qyu::Errors::CouldNotFetchTask.new(queue_name, message['id'], message['task_id'], ex)
  end
  new(task_id, task_attrs, queue_name, message['id'])
end
new(id, attributes, queue_name, message_id = nil) click to toggle source
# File lib/qyu/models/task.rb, line 208
def initialize(id, attributes, queue_name, message_id = nil)
  # puts "task initialized attrs: #{attributes}"
  @status         = Status.new(id)
  @id             = id
  @job_id         = attributes['job_id']
  @parent_task_id = attributes['parent_task_id']
  @payload        = attributes['payload']
  @queue_name     = queue_name
  @message_id     = message_id
  @name           = attributes['name']
  @created_at     = attributes['created_at']
  @updated_at     = attributes['updated_at']

  @locked_until = nil
  @lease_thread = nil
  @lease_token  = nil
end
requeue(queue_name, id, message_id) click to toggle source
# File lib/qyu/models/task.rb, line 157
def self.requeue(queue_name, id, message_id)
  # TODO For FIFO queues (future use)
  Qyu.logger.debug "Re-enqueuing message with ID=#{message_id} in queue `#{queue_name}`"
  Qyu.queue.enqueue_task(queue_name, id)
end
select(job_id:) click to toggle source
# File lib/qyu/models/task.rb, line 63
def self.select(job_id:)
  Qyu.store.select_tasks_by_job_id(job_id).map do |task|
    new(task['id'], task, task['queue_name'])
  end
end
valid_attributes?(_attributes) click to toggle source
# File lib/qyu/models/task.rb, line 69
def self.valid_attributes?(_attributes)
  true
end
valid_queue_name?(queue_name) click to toggle source
# File lib/qyu/models/task.rb, line 73
def self.valid_queue_name?(queue_name)
  !queue_name.nil? && queue_name != ''
end

Public Instance Methods

[](attribute) click to toggle source
# File lib/qyu/models/task.rb, line 202
def [](attribute)
  public_send(attribute)
end
acknowledge_message() click to toggle source
# File lib/qyu/models/task.rb, line 140
def acknowledge_message
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
end
acknowledgeable?() click to toggle source
# File lib/qyu/models/task.rb, line 77
def acknowledgeable?
  @status.completed? || @status.invalid_payload?
end
completed?() click to toggle source
# File lib/qyu/models/task.rb, line 81
def completed?
  @status.completed?
end
descriptor() click to toggle source

Returns task descriptor

@return [Hash] task descriptor

# File lib/qyu/models/task.rb, line 198
def descriptor
  workflow_descriptor['tasks'][name]
end
enqueue_in_failure_queue() click to toggle source
# File lib/qyu/models/task.rb, line 163
def enqueue_in_failure_queue
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.enqueue_in_failure_queue(queue_name, id, message_id)
end
job() click to toggle source

Returns parent job

@return [Qyu::Job] parent job

# File lib/qyu/models/task.rb, line 191
def job
  @job ||= Qyu::Job.find(job_id)
end
lock!() click to toggle source
# File lib/qyu/models/task.rb, line 89
def lock!
  fail Qyu::Errors::LockAlreadyAcquired if locked?
  Qyu.logger.debug "Task with ID=#{id} lock!"

  @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period])
  Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}"
  return false if @lease_token.nil?

  schedule_renewal
  true
end
locked?() click to toggle source
# File lib/qyu/models/task.rb, line 85
def locked?
  !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until
end
mark_completed() click to toggle source
# File lib/qyu/models/task.rb, line 125
def mark_completed
  Qyu.store.update_status(id, Status::COMPLETED)
  Qyu.logger.info "Task with ID=#{id} marked completed."
end
mark_failed() click to toggle source
# File lib/qyu/models/task.rb, line 130
def mark_failed
  Qyu.store.update_status(id, Status::FAILED)
  Qyu.logger.debug "Task with ID=#{id} marked failed."
end
mark_invalid_payload() click to toggle source
# File lib/qyu/models/task.rb, line 135
def mark_invalid_payload
  Qyu.store.update_status(id, Status::INVALID_PAYLOAD)
  Qyu.logger.debug "Task with ID=#{id} has invalid payload."
end
mark_queued() click to toggle source
# File lib/qyu/models/task.rb, line 115
def mark_queued
  Qyu.store.update_status(id, Status::QUEUED)
  Qyu.logger.debug "Task with ID=#{id} marked queued."
end
mark_working() click to toggle source
# File lib/qyu/models/task.rb, line 120
def mark_working
  Qyu.store.update_status(id, Status::WORKING)
  Qyu.logger.debug "Task with ID=#{id} marked working."
end
requeue() click to toggle source
# File lib/qyu/models/task.rb, line 150
def requeue
  # TODO For FIFO queues (future use)
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.requeue(queue_name, id, message_id)
end
unlock!() click to toggle source
# File lib/qyu/models/task.rb, line 101
def unlock!
  fail Qyu::Errors::LockNotAcquired unless locked?
  Qyu.logger.debug "Task with ID=#{id} unlocking!"

  @lease_thread&.kill
  success = Qyu.store.unlock_task!(id, @lease_token)
  if success
    @lease_token  = nil
    @locked_until = nil
  end

  success
end
workflow() click to toggle source

Returns workflow specified in parent job

@return [Qyu::Workflow] full workflow

# File lib/qyu/models/task.rb, line 177
def workflow
  job.workflow
end
workflow_descriptor() click to toggle source

Returns workflow descriptor from parent job

@return [Hash] workflow descriptor

# File lib/qyu/models/task.rb, line 184
def workflow_descriptor
  job.descriptor
end

Private Instance Methods

schedule_renewal() click to toggle source
# File lib/qyu/models/task.rb, line 226
def schedule_renewal
  Qyu.logger.debug 'scheduling renewal'
  renewal_moment = Qyu::Utils.seconds_after_time(-1 * LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL * Qyu.config.store[:lease_period], @locked_until)
  Qyu.logger.debug "renewal moment: #{renewal_moment}"
  @lease_thread = Thread.new do
    Qyu.logger.debug 'lease thread entered'
    while Time.now < renewal_moment
      sleep(POLL_INTERVAL)
      Qyu.logger.debug 'lease thread sleep'
    end
    Qyu.logger.debug 'lease thread time has come'
    @locked_until = Qyu.store.renew_lock_lease(id, Qyu.config.store[:lease_period], @lease_token)
    Qyu.logger.debug "lease thread locked until = #{@locked_until}"
    schedule_renewal
  end
end