class ActivePublisher::Async::InMemoryAdapter::Adapter

Attributes

async_queue[R]

Public Class Methods

new(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 23
def initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2)
  logger.info "Starting in-memory publisher adapter"

  @async_queue = ::ActivePublisher::Async::InMemoryAdapter::AsyncQueue.new(
    back_pressure_strategy,
    max_queue_size,
    supervisor_interval
  )
end

Public Instance Methods

publish(route, payload, exchange_name, options = {}) click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 33
def publish(route, payload, exchange_name, options = {})
  message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
  async_queue.push(message)
  nil
end
shutdown!() click to toggle source
# File lib/active_publisher/async/in_memory_adapter.rb, line 39
def shutdown!
  max_wait_time = ::ActivePublisher.configuration.seconds_to_wait_for_graceful_shutdown
  started_shutting_down_at = ::Time.now

  logger.info "Draining async publisher in-memory adapter queue before shutdown. current queue size: #{async_queue.size}."
  while async_queue.size > 0
    if (::Time.now - started_shutting_down_at) > max_wait_time
      logger.info "Forcing async publisher adapter shutdown because graceful shutdown period of #{max_wait_time} seconds was exceeded. Current queue size: #{async_queue.size}."
      break
    end

    sleep 0.1
  end
end