class Gush::Worker
Attributes
client[R]
job[R]
workflow_id[R]
Public Instance Methods
perform(workflow_id, job_id)
click to toggle source
# File lib/gush/worker.rb, line 6 def perform(workflow_id, job_id) setup_job(workflow_id, job_id) job.payloads = incoming_payloads error = nil mark_as_started begin job.perform rescue StandardError => error mark_as_failed raise error else mark_as_finished enqueue_outgoing_jobs end end
Private Instance Methods
elapsed(start)
click to toggle source
# File lib/gush/worker.rb, line 64 def elapsed(start) (Time.now - start).to_f.round(3) end
enqueue_outgoing_jobs()
click to toggle source
# File lib/gush/worker.rb, line 68 def enqueue_outgoing_jobs job.outgoing.each do |job_name| RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do out = client.find_job(workflow_id, job_name) if out.ready_to_start? client.enqueue_job(workflow_id, out) end end end end
incoming_payloads()
click to toggle source
# File lib/gush/worker.rb, line 38 def incoming_payloads job.incoming.map do |job_name| job = client.find_job(workflow_id, job_name) { id: job.name, class: job.klass.to_s, output: job.output_payload } end end
mark_as_failed()
click to toggle source
# File lib/gush/worker.rb, line 54 def mark_as_failed job.fail! client.persist_job(workflow_id, job) end
mark_as_finished()
click to toggle source
# File lib/gush/worker.rb, line 49 def mark_as_finished job.finish! client.persist_job(workflow_id, job) end
mark_as_started()
click to toggle source
# File lib/gush/worker.rb, line 59 def mark_as_started job.start! client.persist_job(workflow_id, job) end
setup_job(workflow_id, job_id)
click to toggle source
# File lib/gush/worker.rb, line 33 def setup_job(workflow_id, job_id) @workflow_id = workflow_id @job ||= client.find_job(workflow_id, job_id) end