class Kafka::AsyncProducer::Worker

Public Class Methods

new(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) click to toggle source
# File lib/kafka/async_producer.rb, line 189
def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)
end

Public Instance Methods

run() click to toggle source
# File lib/kafka/async_producer.rb, line 199
def run
  @logger.push_tags(@producer.to_s)
  @logger.info "Starting async producer in the background..."

  loop do
    operation, payload = @queue.pop

    case operation
    when :produce
      produce(*payload)
      deliver_messages if threshold_reached?
    when :deliver_messages
      deliver_messages
    when :shutdown
      begin
        # Deliver any pending messages first.
        @producer.deliver_messages
      rescue Error => e
        @logger.error("Failed to deliver messages during shutdown: #{e.message}")

        @instrumenter.instrument("drop_messages.async_producer", {
          message_count: @producer.buffer_size + @queue.size,
        })
      end

      # Stop the run loop.
      break
    else
      raise "Unknown operation #{operation.inspect}"
    end
  end
rescue Kafka::Error => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.info "Restarting in 10 seconds..."

  sleep 10
  retry
rescue Exception => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.error "Async producer crashed!"
ensure
  @producer.shutdown
  @logger.pop_tags
end

Private Instance Methods

deliver_messages() click to toggle source
# File lib/kafka/async_producer.rb, line 265
def deliver_messages
  @producer.deliver_messages
rescue DeliveryFailed, ConnectionError => e
  # Failed to deliver messages -- nothing to do but log and try again later.
  @logger.error("Failed to asynchronously deliver messages: #{e.message}")
  @instrumenter.instrument("error.async_producer", { error: e })
end
produce(*args) click to toggle source
# File lib/kafka/async_producer.rb, line 246
def produce(*args)
  retries = 0
  begin
    @producer.produce(*args)
  rescue BufferOverflow => e
    deliver_messages
    if @max_retries == -1
      retry
    elsif retries < @max_retries
      retries += 1
      sleep @retry_backoff**retries
      retry
    else
      @logger.error("Failed to asynchronously produce messages due to BufferOverflow")
      @instrumenter.instrument("error.async_producer", { error: e })
    end
  end
end
threshold_reached?() click to toggle source
# File lib/kafka/async_producer.rb, line 273
def threshold_reached?
  @delivery_threshold > 0 &&
    @producer.buffer_size >= @delivery_threshold
end