class ActivePublisher::Async::InMemoryAdapter::AsyncQueue

Constants

BACK_PRESSURE_STRATEGIES

These strategies are used to determine what to do with messages when the queue is full. :raise - Raise an error and drop the message. :drop - Silently drop the message. :wait - Wait for space in the queue to become available.

Attributes

back_pressure_strategy[RW]
consumers[R]
max_queue_size[RW]
queue[R]
supervisor[R]
supervisor_interval[RW]

Public Class Methods

new(back_pressure_strategy, max_queue_size, supervisor_interval) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 19
def initialize(back_pressure_strategy, max_queue_size, supervisor_interval)
  self.back_pressure_strategy = back_pressure_strategy
  @max_queue_size = max_queue_size
  @supervisor_interval = supervisor_interval
  @queue = ::MultiOpQueue::Queue.new
  @consumers = {}
  create_and_supervise_consumers!
end

Public Instance Methods

back_pressure_strategy=(strategy) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 28
def back_pressure_strategy=(strategy)
  fail ::ArgumentError, "Invalid back pressure strategy: #{strategy}" unless BACK_PRESSURE_STRATEGIES.include?(strategy)
  @back_pressure_strategy = strategy
end
push(message) click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 33
def push(message)
  if queue.size >= max_queue_size
    case back_pressure_strategy
    when :drop
      ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher"
      return
    when :raise
      ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher"
      fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped."
    when :wait
      ::ActiveSupport::Notifications.instrument "wait_for_async_queue.active_publisher" do
        # This is a really crappy way to wait
        sleep 0.01 until queue.size < max_queue_size
      end
    end
  end

  queue.push(message)
end
size() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 53
def size
  # Requests might be in flight (out of the queue, but not yet published), so taking the max should be
  # good enough to make sure we're honest about the actual queue size.
  return queue.size if consumers.empty?
  [queue.size, consumer_sampled_queue_size].max
end

Private Instance Methods

consumer_sampled_queue_size() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 89
def consumer_sampled_queue_size
  consumers.values.map(&:sampled_queue_size).max
end
create_and_supervise_consumers!() click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 62
def create_and_supervise_consumers!
  ::ActivePublisher.configuration.publisher_threads.times do
    consumer_id = ::SecureRandom.uuid
    consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
    supervisor_task = ::Concurrent::TimerTask.new(:execution_interval => supervisor_interval) do
      current_time = ::Time.now
      consumer = consumers[consumer_id]

      # Consumer is lagging if it does not "tick" at least once every 10 seconds.
      seconds_since_last_tick = current_time - consumer.last_tick_at
      consumer_is_lagging = seconds_since_last_tick > ::ActivePublisher.configuration.max_async_publisher_lag_time
      logger.error "ActivePublisher consumer is lagging. Last consumer tick was #{seconds_since_last_tick} seconds ago." if consumer_is_lagging

      # Check to see if we should restart the consumer.
      if !consumer.alive? || consumer_is_lagging
        consumer.kill rescue nil
        consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
        ::ActiveSupport::Notifications.instrument "async_queue.thread_restart"
      end

      # Notify the current queue size.
      ::ActiveSupport::Notifications.instrument "async_queue_size.active_publisher", queue.size
    end
    supervisor_task.execute
  end
end