class ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter

Shoryuken concurrent adapter for Active Job

This adapter sends messages asynchronously (ie non-blocking) and allows the caller to set up handlers for both success and failure

To use this adapter, set up as:

success_handler = ->(response, job, options) { StatsD.increment(“#{job.class.name}.success”) } error_handler = ->(err, job, options) { StatsD.increment(“#{job.class.name}.failure”) }

adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new(success_handler, error_handler)

config.active_job.queue_adapter = adapter

Public Class Methods

new(success_handler = nil, error_handler = nil) click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 19
def initialize(success_handler = nil, error_handler = nil)
  @success_handler = success_handler
  @error_handler = error_handler
end

Public Instance Methods

enqueue(job, options = {}) click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 24
def enqueue(job, options = {})
  send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) }
end
error_handler() click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 32
def error_handler
  @error_handler ||= begin
    lambda { |error, job, _options|
      Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}")
    }
  end
end
success_handler() click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 28
def success_handler
  @success_handler ||= ->(_send_message_response, _job, _options) { nil }
end

Private Instance Methods

send_concurrently(job, options) { |f_job, f_options| ... } click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 42
def send_concurrently(job, options)
  Concurrent::Promises
    .future(job, options) { |f_job, f_options| [yield(f_job, f_options), f_job, f_options] }
    .then { |send_message_response, f_job, f_options| success_handler.call(send_message_response, f_job, f_options) }
    .rescue(job, options) { |err, f_job, f_options| error_handler.call(err, f_job, f_options) }
end