class Kafka::TopicProducer
Public Class Methods
new(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 78 def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:) @cluster = cluster @transaction_manager = transaction_manager @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner @topic = topic @cluster.add_target_topics(Set.new([topic])) # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end
Public Instance Methods
abort_transaction()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 170 def abort_transaction @transaction_manager.abort_transaction end
assign_partitions!()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 246 def assign_partitions! failed_messages = [] partition_count = @cluster.partitions_for(@topic).count @pending_message_queue.each do |message| partition = message.partition begin if partition.nil? partition = @partitioner.call(partition_count, message) end @buffer.write( value: message.value, key: message.key, headers: message.headers, topic: message.topic, partition: partition, create_time: message.create_time, ) rescue Kafka::Error => e failed_messages << message end end if failed_messages.any? failed_messages.group_by(&:topic).each do |topic, messages| @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}" end @cluster.mark_as_stale! end @pending_message_queue.replace(failed_messages) end
begin_transaction()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 162 def begin_transaction @transaction_manager.begin_transaction end
buffer_bytesize()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 138 def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end
buffer_messages()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 282 def buffer_messages messages = [] @pending_message_queue.each do |message| messages << message end @buffer.each do |topic, partition, messages_for_partition| messages_for_partition.each do |message| messages << PendingMessage.new( value: message.value, key: message.key, headers: message.headers, topic: topic, partition: partition, partition_key: nil, create_time: message.create_time ) end end messages end
buffer_size()
click to toggle source
Returns the number of messages currently held in the buffer.
@return [Integer] buffer size.
# File lib/fluent/plugin/kafka_producer_ext.rb, line 134 def buffer_size @pending_message_queue.size + @buffer.size end
clear_buffer()
click to toggle source
Deletes all buffered messages.
@return [nil]
# File lib/fluent/plugin/kafka_producer_ext.rb, line 145 def clear_buffer @buffer.clear @pending_message_queue.clear end
commit_transaction()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 166 def commit_transaction @transaction_manager.commit_transaction end
deliver_messages()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 124 def deliver_messages # There's no need to do anything if the buffer is empty. return if buffer_size == 0 deliver_messages_with_retries end
deliver_messages_with_retries()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 186 def deliver_messages_with_retries attempt = 0 #@cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, transaction_manager: @transaction_manager, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compressor: @compressor, logger: @logger, instrumenter: @instrumenter, ) loop do attempt += 1 begin @cluster.refresh_metadata_if_necessary! rescue ConnectionError => e raise DeliveryFailed.new(e, buffer_messages) end assign_partitions! operation.execute if @required_acks.zero? # No response is returned by the brokers, so we can't know which messages # have been successfully written. Our only option is to assume that they all # have. @buffer.clear end if buffer_size.zero? break elsif attempt <= @max_retries @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else @logger.error "Failed to send all messages; keeping remaining messages in buffer" break end end unless @pending_message_queue.empty? # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages) end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages) end end
init_transactions()
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 158 def init_transactions @transaction_manager.init_transactions end
produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now)
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 102 def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now) message = PendingMessage.new( value: value, key: key, headers: headers, topic: @topic, partition: partition, partition_key: partition_key, create_time: create_time ) # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise 'You must trigger begin_transaction before producing messages' end @pending_message_queue.write(message) nil end
shutdown()
click to toggle source
Closes all connections to the brokers.
@return [nil]
# File lib/fluent/plugin/kafka_producer_ext.rb, line 153 def shutdown @transaction_manager.close @cluster.disconnect end
transaction() { || ... }
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 174 def transaction raise 'This method requires a block' unless block_given? begin_transaction yield commit_transaction rescue Kafka::Producer::AbortTransaction abort_transaction rescue abort_transaction raise end