class Gush::Worker

Attributes

client[R]
job[R]
workflow_id[R]

Public Instance Methods

perform(workflow_id, job_id) click to toggle source
# File lib/gush/worker.rb, line 6
def perform(workflow_id, job_id)
  setup_job(workflow_id, job_id)

  job.payloads = incoming_payloads

  error = nil

  mark_as_started
  begin
    job.perform
  rescue StandardError => error
    mark_as_failed
    raise error
  else
    mark_as_finished
    enqueue_outgoing_jobs
  end
end

Private Instance Methods

elapsed(start) click to toggle source
# File lib/gush/worker.rb, line 64
def elapsed(start)
  (Time.now - start).to_f.round(3)
end
enqueue_outgoing_jobs() click to toggle source
# File lib/gush/worker.rb, line 68
def enqueue_outgoing_jobs
  job.outgoing.each do |job_name|
    RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do
      out = client.find_job(workflow_id, job_name)

      if out.ready_to_start?
        client.enqueue_job(workflow_id, out)
      end
    end
  end
end
incoming_payloads() click to toggle source
# File lib/gush/worker.rb, line 38
def incoming_payloads
  job.incoming.map do |job_name|
    job = client.find_job(workflow_id, job_name)
    {
      id: job.name,
      class: job.klass.to_s,
      output: job.output_payload
    }
  end
end
mark_as_failed() click to toggle source
# File lib/gush/worker.rb, line 54
def mark_as_failed
  job.fail!
  client.persist_job(workflow_id, job)
end
mark_as_finished() click to toggle source
# File lib/gush/worker.rb, line 49
def mark_as_finished
  job.finish!
  client.persist_job(workflow_id, job)
end
mark_as_started() click to toggle source
# File lib/gush/worker.rb, line 59
def mark_as_started
  job.start!
  client.persist_job(workflow_id, job)
end
setup_job(workflow_id, job_id) click to toggle source
# File lib/gush/worker.rb, line 33
def setup_job(workflow_id, job_id)
  @workflow_id = workflow_id
  @job ||= client.find_job(workflow_id, job_id)
end