class ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter
Shoryuken
concurrent adapter for Active Job¶ ↑
This adapter sends messages asynchronously (ie non-blocking) and allows the caller to set up handlers for both success and failure
To use this adapter, set up as:
success_handler
= ->(response, job, options) { StatsD.increment(“#{job.class.name}.success”) } error_handler
= ->(err, job, options) { StatsD.increment(“#{job.class.name}.failure”) }
adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new(success_handler, error_handler)
config.active_job.queue_adapter = adapter
Public Class Methods
new(success_handler = nil, error_handler = nil)
click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 19 def initialize(success_handler = nil, error_handler = nil) @success_handler = success_handler @error_handler = error_handler end
Public Instance Methods
enqueue(job, options = {})
click to toggle source
Calls superclass method
ActiveJob::QueueAdapters::ShoryukenAdapter::enqueue
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 24 def enqueue(job, options = {}) send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) } end
error_handler()
click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 32 def error_handler @error_handler ||= begin lambda { |error, job, _options| Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}") } end end
success_handler()
click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 28 def success_handler @success_handler ||= ->(_send_message_response, _job, _options) { nil } end
Private Instance Methods
send_concurrently(job, options) { |f_job, f_options| ... }
click to toggle source
# File lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb, line 42 def send_concurrently(job, options) Concurrent::Promises .future(job, options) { |f_job, f_options| [yield(f_job, f_options), f_job, f_options] } .then { |send_message_response, f_job, f_options| success_handler.call(send_message_response, f_job, f_options) } .rescue(job, options) { |err, f_job, f_options| error_handler.call(err, f_job, f_options) } end