class Kafka::TopicProducer
Public Class Methods
new(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 59 def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @topic = topic @cluster.add_target_topics(Set.new([topic])) # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end
Public Instance Methods
buffer_bytesize()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 112 def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end
buffer_size()
click to toggle source
Returns the number of messages currently held in the buffer.
@return [Integer] buffer size.
# File lib/fluent/plugin/kafka_producer_ext.rb, line 108 def buffer_size @pending_message_queue.size + @buffer.size end
clear_buffer()
click to toggle source
Deletes all buffered messages.
@return [nil]
# File lib/fluent/plugin/kafka_producer_ext.rb, line 119 def clear_buffer @buffer.clear @pending_message_queue.clear end
deliver_messages()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 98 def deliver_messages # There's no need to do anything if the buffer is empty. return if buffer_size == 0 deliver_messages_with_retries end
produce(value, key, partition, partition_key)
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 81 def produce(value, key, partition, partition_key) create_time = Time.now message = PendingMessage.new( value, key, @topic, partition, partition_key, create_time ) @pending_message_queue.write(message) nil end
shutdown()
click to toggle source
Closes all connections to the brokers.
@return [nil]
# File lib/fluent/plugin/kafka_producer_ext.rb, line 127 def shutdown @cluster.disconnect end
Private Instance Methods
assign_partitions!()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 188 def assign_partitions! failed_messages = [] partition_count = @cluster.partitions_for(@topic).count @pending_message_queue.each do |message| partition = message.partition begin if partition.nil? partition = Partitioner.partition_for_key(partition_count, message) end @buffer.write( value: message.value, key: message.key, topic: message.topic, partition: partition, create_time: message.create_time, ) rescue Kafka::Error => e failed_messages << message end end if failed_messages.any? failed_messages.group_by(&:topic).each do |topic, messages| @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}" end @cluster.mark_as_stale! end @pending_message_queue.replace(failed_messages) end
deliver_messages_with_retries()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 133 def deliver_messages_with_retries attempt = 0 #@cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compressor: @compressor, logger: @logger, instrumenter: @instrumenter, ) loop do attempt += 1 @cluster.refresh_metadata_if_necessary! assign_partitions! operation.execute if @required_acks.zero? # No response is returned by the brokers, so we can't know which messages # have been successfully written. Our only option is to assume that they all # have. @buffer.clear end if buffer_size.zero? break elsif attempt <= @max_retries @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else @logger.error "Failed to send all messages; keeping remaining messages in buffer" break end end unless @pending_message_queue.empty? # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages" end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise DeliveryFailed, "Failed to send messages to #{partitions}" end end