class Kafka::Producer
Public Instance Methods
produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil)
click to toggle source
# File lib/fluent/plugin/kafka_producer_ext.rb, line 15 def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil) create_time = Time.now message = PendingMessage.new( value: value, key: key, headers: EMPTY_HEADER, topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise 'You must trigger begin_transaction before producing messages' end @target_topics.add(topic) @pending_message_queue.write(message) nil end