class ActiveJob::QueueAdapters::ShoryukenAdapter

Shoryuken adapter for Active Job

Shoryuken (“sho-ryu-ken”) is a super-efficient AWS SQS thread based message processor.

Read more about Shoryuken here.

To use Shoryuken set the queue_adapter config to :shoryuken.

Rails.application.config.active_job.queue_adapter = :shoryuken

Constants

MESSAGE_ATTRIBUTES

Public Class Methods

enqueue(job) click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 24
def enqueue(job)
  instance.enqueue(job)
end
enqueue_at(job, timestamp) click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 28
def enqueue_at(job, timestamp)
  instance.enqueue_at(job, timestamp)
end
instance() click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 19
def instance
  # https://github.com/phstc/shoryuken/pull/174#issuecomment-174555657
  @instance ||= new
end

Private Instance Methods

calculate_delay(timestamp) click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 50
def calculate_delay(timestamp)
  delay = (timestamp - Time.current.to_f).round
  raise 'The maximum allowed delay is 15 minutes' if delay > 15.minutes

  delay
end
message(queue, job) click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 57
def message(queue, job)
  body = job.serialize
  job_params = job.sqs_send_message_parameters

  attributes = job_params[:message_attributes] || {}

  msg = {
    message_body: body,
    message_attributes: attributes.merge(MESSAGE_ATTRIBUTES)
  }

  if queue.fifo?
    # See https://github.com/ruby-shoryuken/shoryuken/issues/457 and
    # https://github.com/ruby-shoryuken/shoryuken/pull/750#issuecomment-1781317929
    msg[:message_deduplication_id] = Digest::SHA256.hexdigest(
      JSON.dump(body.except('job_id', 'enqueued_at'))
    )
  end

  msg.merge(job_params.except(:message_attributes))
end
register_worker!(job) click to toggle source
# File lib/shoryuken/extensions/active_job_adapter.rb, line 79
def register_worker!(job)
  Shoryuken.register_worker(job.queue_name, JobWrapper)
end