class Kafka::AsyncProducer::Worker
Public Class Methods
new(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
click to toggle source
# File lib/kafka/async_producer.rb, line 189 def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @max_retries = max_retries @retry_backoff = retry_backoff @instrumenter = instrumenter @logger = TaggedLogger.new(logger) end
Public Instance Methods
run()
click to toggle source
# File lib/kafka/async_producer.rb, line 199 def run @logger.push_tags(@producer.to_s) @logger.info "Starting async producer in the background..." loop do operation, payload = @queue.pop case operation when :produce produce(*payload) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown begin # Deliver any pending messages first. @producer.deliver_messages rescue Error => e @logger.error("Failed to deliver messages during shutdown: #{e.message}") @instrumenter.instrument("drop_messages.async_producer", { message_count: @producer.buffer_size + @queue.size, }) end # Stop the run loop. break else raise "Unknown operation #{operation.inspect}" end end rescue Kafka::Error => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.info "Restarting in 10 seconds..." sleep 10 retry rescue Exception => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.error "Async producer crashed!" ensure @producer.shutdown @logger.pop_tags end
Private Instance Methods
deliver_messages()
click to toggle source
# File lib/kafka/async_producer.rb, line 265 def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError => e # Failed to deliver messages -- nothing to do but log and try again later. @logger.error("Failed to asynchronously deliver messages: #{e.message}") @instrumenter.instrument("error.async_producer", { error: e }) end
produce(*args)
click to toggle source
# File lib/kafka/async_producer.rb, line 246 def produce(*args) retries = 0 begin @producer.produce(*args) rescue BufferOverflow => e deliver_messages if @max_retries == -1 retry elsif retries < @max_retries retries += 1 sleep @retry_backoff**retries retry else @logger.error("Failed to asynchronously produce messages due to BufferOverflow") @instrumenter.instrument("error.async_producer", { error: e }) end end end
threshold_reached?()
click to toggle source
# File lib/kafka/async_producer.rb, line 273 def threshold_reached? @delivery_threshold > 0 && @producer.buffer_size >= @delivery_threshold end