class MultiBackgroundJob::Adapters::Faktory

This is a Faktory adapter that converts MultiBackgroundJob::Worker object into a faktory readable format and then push the jobs into the service.

Attributes

queue[R]
worker[R]

Public Class Methods

coerce_to_worker(payload, **options) click to toggle source

Coerces the raw payload into an instance of Worker @param payload [Hash] The job as json from redis @options options [Hash] list of options that will be passed along to the Worker instance @return [MultiBackgroundJob::Worker] and instance of MultiBackgroundJob::Worker

# File lib/multi_background_job/adapters/faktory.rb, line 26
def self.coerce_to_worker(payload, **options)
  raise(Error, 'invalid payload') unless payload.is_a?(Hash)
  raise(Error, 'invalid payload') unless payload['jobtype'].is_a?(String)

  options[:retry] ||= payload['retry'] if payload.key?('retry')
  options[:queue] ||= payload['queue'] if payload.key?('queue')

  MultiBackgroundJob[payload['jobtype'], **options].tap do |worker|
    worker.with_args(*Array(payload['args'])) if payload.key?('args')
    worker.with_job_jid(payload['jid']) if payload.key?('jid')
    worker.created_at(payload['created_at']) if payload.key?('created_at')
    worker.enqueued_at(payload['enqueued_at']) if payload.key?('enqueued_at')
    worker.at(payload['at']) if payload.key?('at')
    worker.unique(payload['uniq']) if payload.key?('uniq')
  end
end
new(worker) click to toggle source
# File lib/multi_background_job/adapters/faktory.rb, line 10
def initialize(worker)
  @worker = worker
  @queue = worker.options.fetch(:queue, 'default')

  @payload = worker.payload.merge(
    'jobtype' => worker.worker_class,
    'queue'   => @queue,
    'retry'   => parse_retry(worker.options[:retry]),
  )
  @payload['created_at'] ||= Time.now.to_f
end
push(worker) click to toggle source

Initializes adapter and push job into the faktory service

@param worker [MultiBackgroundJob::Worker] An instance of MultiBackgroundJob::Worker @return [Hash] Job payload @see push method for more details

# File lib/multi_background_job/adapters/faktory.rb, line 48
def self.push(worker)
  new(worker).push
end

Public Instance Methods

push() click to toggle source

Push job to Faktory

* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution

@raise [MultiBackgroundJob::Error] raise and error when faktory dependency is not loaded @return [Hash] Payload that was sent to server

# File lib/multi_background_job/adapters/faktory.rb, line 58
      def push
        unless Object.const_defined?(:Faktory)
          raise MultiBackgroundJob::Error, <<~ERR
          Faktory client for ruby is not loaded. You must install and require https://github.com/contribsys/faktory_worker_ruby.
          ERR
        end
        @payload['enqueued_at'] ||= Time.now.to_f
        {'created_at' => false, 'enqueued_at' => false, 'at' => true}.each do |field, past_remove|
          # Optimization to enqueue something now that is scheduled to go out now or in the past
          if (time = @payload.delete(field)) &&
              (!past_remove || (past_remove && time > Time.now.to_f))
            @payload[field] = parse_time(time)
          end
        end

        pool = Thread.current[:faktory_via_pool] || ::Faktory.server_pool
        ::Faktory.client_middleware.invoke(@payload, pool) do
          pool.with do |c|
            c.push(@payload)
          end
        end
        @payload
      end

Protected Instance Methods

parse_retry(value) click to toggle source

Convert worker retry value acording to the Go struct datatype.

  • 25 is the default.

  • 0 means the job is completely ephemeral. No matter if it fails or succeeds, it will be discarded.

  • -1 means the job will go straight to the Dead set if it fails, no retries.

# File lib/multi_background_job/adapters/faktory.rb, line 89
def parse_retry(value)
  case value
  when Numeric then value.to_i
  when false then -1
  else
    25
  end
end
parse_time(value) click to toggle source
# File lib/multi_background_job/adapters/faktory.rb, line 98
def parse_time(value)
  case value
  when Numeric then Time.at(value).to_datetime.rfc3339(9)
  when Time then value.to_datetime.rfc3339(9)
  when DateTime then value.rfc3339(9)
  end
end
to_json(value) click to toggle source
# File lib/multi_background_job/adapters/faktory.rb, line 106
def to_json(value)
  MultiJson.dump(value, mode: :compat)
end