class Qyu::Workers::Sync

Qyu::Workers::Sync

Public Instance Methods

work(queue_name) { |task| ... } click to toggle source
Calls superclass method Qyu::Workers::Base#work
# File lib/qyu/workers/sync.rb, line 7
def work(queue_name)
  super do |task|
    job = task.job
    task_names_to_wait_for = job.tasks_to_wait_for(task)
    log(:debug, "Task names to wait for: #{task_names_to_wait_for}")
    task_names_to_wait_for.each do |task_name|
      sync_condition = job.sync_condition(task, task_name)
      log(:debug, "Task: #{task_name}, Sync condition: #{sync_condition}")
      if respond_to?(sync_condition['function'], true)
        __send__(sync_condition['function'], job, task, task_name, sync_condition['param'])
        # execute attached sync block only if codition passes (i.e. No errors raised)
        yield(task) if block_given?
      else
        fail Qyu::Errors::NotImplementedError
      end
    end
  end
end

Private Instance Methods

check_completion!(task_ids) click to toggle source
# File lib/qyu/workers/sync.rb, line 56
def check_completion!(task_ids)
  task_ids.each do |task_id|
    state = Qyu::Status.new(task_id)
    log(:debug, "[CHECK_COMPLETION] Task ID: #{task_id}, Status: #{state.status}")
    next if state.completed?
    fail Qyu::Errors::UnsyncError
  end
end
completed(job, task, task_name_to_wait_for, _sync_param_name) click to toggle source
# File lib/qyu/workers/sync.rb, line 44
def completed(job, task, task_name_to_wait_for, _sync_param_name)
  parent_task_id = task.parent_task_id
  log(:debug, "Task: #{task_name_to_wait_for}, Parent task ID: #{parent_task_id}")
  task_ids = job.find_task_ids_by_name_and_ancestor_task_id(task_name_to_wait_for, parent_task_id)
  log(:debug, "Task: #{task_name_to_wait_for}, Task IDs: #{task_ids}")
  if task_ids.empty?
    log(:debug, 'Re-enqueuing sync task')
    fail Qyu::Errors::UnsyncError
  end
  check_completion!(task_ids)
end
eq_completed(job, task, task_name_to_wait_for, sync_param_name) click to toggle source
# File lib/qyu/workers/sync.rb, line 28
def eq_completed(job, task, task_name_to_wait_for, sync_param_name)
  sync_param_value = task.payload[sync_param_name]
  log(:debug, "Task: #{task_name_to_wait_for}, Sync param value: #{sync_param_value}")
  parent_task_id = task.parent_task_id
  log(:debug, "Task: #{task_name_to_wait_for}, Parent task ID: #{parent_task_id}")
  task_ids = job.find_task_ids_by_name_and_ancestor_task_id(task_name_to_wait_for, parent_task_id)
  log(:debug, "Task: #{task_name_to_wait_for}, Task IDs: #{task_ids}")

  if task_ids.size < sync_param_value
    log(:debug, 'Re-enqueuing sync task')
    fail Qyu::Errors::UnsyncError
  end

  check_completion!(task_ids)
end