class MultiBackgroundJob::Adapters::Faktory
This is a Faktory
adapter that converts MultiBackgroundJob::Worker
object into a faktory readable format and then push the jobs into the service.
Attributes
Public Class Methods
Coerces the raw payload into an instance of Worker
@param payload [Hash] The job as json from redis @options options [Hash] list of options that will be passed along to the Worker
instance @return [MultiBackgroundJob::Worker] and instance of MultiBackgroundJob::Worker
# File lib/multi_background_job/adapters/faktory.rb, line 26 def self.coerce_to_worker(payload, **options) raise(Error, 'invalid payload') unless payload.is_a?(Hash) raise(Error, 'invalid payload') unless payload['jobtype'].is_a?(String) options[:retry] ||= payload['retry'] if payload.key?('retry') options[:queue] ||= payload['queue'] if payload.key?('queue') MultiBackgroundJob[payload['jobtype'], **options].tap do |worker| worker.with_args(*Array(payload['args'])) if payload.key?('args') worker.with_job_jid(payload['jid']) if payload.key?('jid') worker.created_at(payload['created_at']) if payload.key?('created_at') worker.enqueued_at(payload['enqueued_at']) if payload.key?('enqueued_at') worker.at(payload['at']) if payload.key?('at') worker.unique(payload['uniq']) if payload.key?('uniq') end end
# File lib/multi_background_job/adapters/faktory.rb, line 10 def initialize(worker) @worker = worker @queue = worker.options.fetch(:queue, 'default') @payload = worker.payload.merge( 'jobtype' => worker.worker_class, 'queue' => @queue, 'retry' => parse_retry(worker.options[:retry]), ) @payload['created_at'] ||= Time.now.to_f end
Initializes adapter and push job into the faktory service
@param worker [MultiBackgroundJob::Worker] An instance of MultiBackgroundJob::Worker
@return [Hash] Job payload @see push method for more details
# File lib/multi_background_job/adapters/faktory.rb, line 48 def self.push(worker) new(worker).push end
Public Instance Methods
Push job to Faktory
* If job has the 'at' key. Then schedule it * Otherwise enqueue for immediate execution
@raise [MultiBackgroundJob::Error] raise and error when faktory dependency is not loaded @return [Hash] Payload that was sent to server
# File lib/multi_background_job/adapters/faktory.rb, line 58 def push unless Object.const_defined?(:Faktory) raise MultiBackgroundJob::Error, <<~ERR Faktory client for ruby is not loaded. You must install and require https://github.com/contribsys/faktory_worker_ruby. ERR end @payload['enqueued_at'] ||= Time.now.to_f {'created_at' => false, 'enqueued_at' => false, 'at' => true}.each do |field, past_remove| # Optimization to enqueue something now that is scheduled to go out now or in the past if (time = @payload.delete(field)) && (!past_remove || (past_remove && time > Time.now.to_f)) @payload[field] = parse_time(time) end end pool = Thread.current[:faktory_via_pool] || ::Faktory.server_pool ::Faktory.client_middleware.invoke(@payload, pool) do pool.with do |c| c.push(@payload) end end @payload end
Protected Instance Methods
Convert worker retry value acording to the Go struct datatype.
-
25 is the default.
-
0 means the job is completely ephemeral. No matter if it fails or succeeds, it will be discarded.
-
-1 means the job will go straight to the Dead set if it fails, no retries.
# File lib/multi_background_job/adapters/faktory.rb, line 89 def parse_retry(value) case value when Numeric then value.to_i when false then -1 else 25 end end
# File lib/multi_background_job/adapters/faktory.rb, line 98 def parse_time(value) case value when Numeric then Time.at(value).to_datetime.rfc3339(9) when Time then value.to_datetime.rfc3339(9) when DateTime then value.rfc3339(9) end end
# File lib/multi_background_job/adapters/faktory.rb, line 106 def to_json(value) MultiJson.dump(value, mode: :compat) end