class Kafka::AsyncProducer

A Kafka producer that does all its work in the background so as to not block the calling thread. Calls to {#deliver_messages} are asynchronous and return immediately.

In addition to this property it's possible to define automatic delivery policies. These allow placing an upper bound on the number of buffered messages and the time between message deliveries.

By default, automatic delivery is disabled and you'll have to call {#deliver_messages} manually.

## Buffer Overflow and Backpressure

The calling thread communicates with the background thread doing the actual work using a thread safe queue. While the background thread is busy delivering messages, new messages will be buffered in the queue. In order to avoid the queue growing uncontrollably in cases where the background thread gets stuck or can't follow the pace of the calling thread, there's a maximum number of messages that is allowed to be buffered. You can configure this value by setting `max_queue_size`.

If you produce messages faster than the background producer thread can deliver them to Kafka you will eventually fill the producer's buffer. Once this happens, the background thread will stop popping messages off the queue until it can successfully deliver the buffered messages. The queue will therefore grow in size, potentially hitting the `max_queue_size` limit. Once this happens, calls to {#produce} will raise a {BufferOverflow} error.

Depending on your use case you may want to slow down the rate of messages being produced or perhaps halt your application completely until the producer can deliver the buffered messages and clear the message queue.

## Example

producer = kafka.async_producer(
  # Keep at most 1.000 messages in the buffer before delivering:
  delivery_threshold: 1000,

  # Deliver messages every 30 seconds:
  delivery_interval: 30,
)

# There's no need to manually call #deliver_messages, it will happen
# automatically in the background.
producer.produce("hello", topic: "greetings")

# Remember to shut down the producer when you're done with it.
producer.shutdown

Public Class Methods

new(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) click to toggle source

Initializes a new AsyncProducer.

@param sync_producer [Kafka::Producer] the synchronous producer that should

be used in the background.

@param max_queue_size [Integer] the maximum number of messages allowed in

the queue.

@param delivery_threshold [Integer] if greater than zero, the number of

buffered messages that will automatically trigger a delivery.

@param delivery_interval [Integer] if greater than zero, the number of

seconds between automatic message deliveries.
# File lib/kafka/async_producer.rb, line 73
def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  raise ArgumentError unless max_queue_size > 0
  raise ArgumentError unless delivery_threshold >= 0
  raise ArgumentError unless delivery_interval >= 0

  @queue = Queue.new
  @max_queue_size = max_queue_size
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)

  @worker = Worker.new(
    queue: @queue,
    producer: sync_producer,
    delivery_threshold: delivery_threshold,
    max_retries: max_retries,
    retry_backoff: retry_backoff,
    instrumenter: instrumenter,
    logger: logger
  )

  # The timer will no-op if the delivery interval is zero.
  @timer = Timer.new(queue: @queue, interval: delivery_interval)

  @thread_mutex = Mutex.new
end

Public Instance Methods

deliver_messages() click to toggle source

Asynchronously delivers the buffered messages. This method will return immediately and the actual work will be done in the background.

@see Kafka::Producer#deliver_messages @return [nil]

# File lib/kafka/async_producer.rb, line 133
def deliver_messages
  ensure_threads_running!

  @queue << [:deliver_messages, nil]

  nil
end
produce(value, topic:, **options) click to toggle source

Produces a message to the specified topic.

@see Kafka::Producer#produce @param (see Kafka::Producer#produce) @raise [BufferOverflow] if the message queue is full. @return [nil]

# File lib/kafka/async_producer.rb, line 105
def produce(value, topic:, **options)
  # We want to fail fast if `topic` isn't a String
  topic = topic.to_str

  ensure_threads_running!

  if @queue.size >= @max_queue_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
  end

  args = [value, **options.merge(topic: topic)]
  @queue << [:produce, args]

  @instrumenter.instrument("enqueue_message.async_producer", {
    topic: topic,
    queue_size: @queue.size,
    max_queue_size: @max_queue_size,
  })

  nil
end
shutdown() click to toggle source

Shuts down the producer, releasing the network resources used. This method will block until the buffered messages have been delivered.

@see Kafka::Producer#shutdown @return [nil]

# File lib/kafka/async_producer.rb, line 146
def shutdown
  ensure_threads_running!

  @timer_thread && @timer_thread.exit
  @queue << [:shutdown, nil]
  @worker_thread && @worker_thread.join

  nil
end

Private Instance Methods

buffer_overflow(topic, message) click to toggle source
# File lib/kafka/async_producer.rb, line 175
def buffer_overflow(topic, message)
  @instrumenter.instrument("buffer_overflow.async_producer", {
    topic: topic,
  })

  raise BufferOverflow, message
end
ensure_threads_running!() click to toggle source
# File lib/kafka/async_producer.rb, line 158
def ensure_threads_running!
  return if worker_thread_alive? && timer_thread_alive?

  @thread_mutex.synchronize do
    @worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
    @timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
  end
end
timer_thread_alive?() click to toggle source
# File lib/kafka/async_producer.rb, line 171
def timer_thread_alive?
  !!@timer_thread && @timer_thread.alive?
end
worker_thread_alive?() click to toggle source
# File lib/kafka/async_producer.rb, line 167
def worker_thread_alive?
  !!@worker_thread && @worker_thread.alive?
end