class Kafka::TopicProducer

Public Class Methods

new(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 59
def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @required_acks = required_acks == :all ? -1 : required_acks
  @ack_timeout = ack_timeout
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @max_buffer_size = max_buffer_size
  @max_buffer_bytesize = max_buffer_bytesize
  @compressor = compressor

  @topic = topic
  @cluster.add_target_topics(Set.new([topic]))

  # A buffer organized by topic/partition.
  @buffer = MessageBuffer.new

  # Messages added by `#produce` but not yet assigned a partition.
  @pending_message_queue = PendingMessageQueue.new
end

Public Instance Methods

buffer_bytesize() click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 112
def buffer_bytesize
  @pending_message_queue.bytesize + @buffer.bytesize
end
buffer_size() click to toggle source

Returns the number of messages currently held in the buffer.

@return [Integer] buffer size.

# File lib/fluent/plugin/kafka_producer_ext.rb, line 108
def buffer_size
  @pending_message_queue.size + @buffer.size
end
clear_buffer() click to toggle source

Deletes all buffered messages.

@return [nil]

# File lib/fluent/plugin/kafka_producer_ext.rb, line 119
def clear_buffer
  @buffer.clear
  @pending_message_queue.clear
end
deliver_messages() click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 98
def deliver_messages
  # There's no need to do anything if the buffer is empty.
  return if buffer_size == 0

  deliver_messages_with_retries
end
produce(value, key, partition, partition_key) click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 81
def produce(value, key, partition, partition_key)
  create_time = Time.now

  message = PendingMessage.new(
    value,
    key,
    @topic,
    partition,
    partition_key,
    create_time
  )

  @pending_message_queue.write(message)

  nil
end
shutdown() click to toggle source

Closes all connections to the brokers.

@return [nil]

# File lib/fluent/plugin/kafka_producer_ext.rb, line 127
def shutdown
  @cluster.disconnect
end

Private Instance Methods

assign_partitions!() click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 188
def assign_partitions!
  failed_messages = []
  partition_count = @cluster.partitions_for(@topic).count

  @pending_message_queue.each do |message|
    partition = message.partition

    begin
      if partition.nil?
        partition = Partitioner.partition_for_key(partition_count, message)
      end

      @buffer.write(
        value: message.value,
        key: message.key,
        topic: message.topic,
        partition: partition,
        create_time: message.create_time,
      )
    rescue Kafka::Error => e
      failed_messages << message
    end
  end

  if failed_messages.any?
    failed_messages.group_by(&:topic).each do |topic, messages|
      @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}"
    end

    @cluster.mark_as_stale!
  end

  @pending_message_queue.replace(failed_messages)
end
deliver_messages_with_retries() click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 133
def deliver_messages_with_retries
  attempt = 0

  #@cluster.add_target_topics(@target_topics)

  operation = ProduceOperation.new(
    cluster: @cluster,
    buffer: @buffer,
    required_acks: @required_acks,
    ack_timeout: @ack_timeout,
    compressor: @compressor,
    logger: @logger,
    instrumenter: @instrumenter,
  )

  loop do
    attempt += 1

    @cluster.refresh_metadata_if_necessary!

    assign_partitions!
    operation.execute

    if @required_acks.zero?
      # No response is returned by the brokers, so we can't know which messages
      # have been successfully written. Our only option is to assume that they all
      # have.
      @buffer.clear
    end

    if buffer_size.zero?
      break
    elsif attempt <= @max_retries
      @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"

      sleep @retry_backoff
    else
      @logger.error "Failed to send all messages; keeping remaining messages in buffer"
      break
    end
  end

  unless @pending_message_queue.empty?
    # Mark the cluster as stale in order to force a cluster metadata refresh.
    @cluster.mark_as_stale!
    raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages"
  end

  unless @buffer.empty?
    partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")

    raise DeliveryFailed, "Failed to send messages to #{partitions}"
  end
end