class ActivePublisher::Async::InMemoryAdapter::Adapter
Attributes
async_queue[R]
Public Class Methods
new(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 23 def initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) logger.info "Starting in-memory publisher adapter" @async_queue = ::ActivePublisher::Async::InMemoryAdapter::AsyncQueue.new( back_pressure_strategy, max_queue_size, supervisor_interval ) end
Public Instance Methods
publish(route, payload, exchange_name, options = {})
click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 33 def publish(route, payload, exchange_name, options = {}) message = ::ActivePublisher::Message.new(route, payload, exchange_name, options) async_queue.push(message) nil end
shutdown!()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 39 def shutdown! max_wait_time = ::ActivePublisher.configuration.seconds_to_wait_for_graceful_shutdown started_shutting_down_at = ::Time.now logger.info "Draining async publisher in-memory adapter queue before shutdown. current queue size: #{async_queue.size}." while async_queue.size > 0 if (::Time.now - started_shutting_down_at) > max_wait_time logger.info "Forcing async publisher adapter shutdown because graceful shutdown period of #{max_wait_time} seconds was exceeded. Current queue size: #{async_queue.size}." break end sleep 0.1 end end