class Kafka::FetchedBatchGenerator
Constants
- ABORTED_TRANSACTION_SIGNAL
- COMMITTED_TRANSACTION_SIGNAL
Public Class Methods
new(topic, fetched_partition, offset, logger:)
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 10 def initialize(topic, fetched_partition, offset, logger:) @topic = topic @fetched_partition = fetched_partition @logger = TaggedLogger.new(logger) @offset = offset end
Public Instance Methods
generate()
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 17 def generate if @fetched_partition.messages.empty? empty_fetched_batch elsif @fetched_partition.messages.first.is_a?(Kafka::Protocol::MessageSet) extract_messages else extract_records end end
Private Instance Methods
abort_marker?(record_batch)
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 107 def abort_marker?(record_batch) return false unless record_batch.is_control_batch if record_batch.records.empty? raise "Invalid control record batch at topic '#{@topic}', partition #{@fetched_partition}" end record_batch.records.first.key == ABORTED_TRANSACTION_SIGNAL end
empty_fetched_batch()
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 29 def empty_fetched_batch FetchedBatch.new( topic: @topic, partition: @fetched_partition.partition, last_offset: nil, highwater_mark_offset: @fetched_partition.highwater_mark_offset, messages: [] ) end
extract_messages()
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 39 def extract_messages last_offset = nil messages = @fetched_partition.messages.flat_map do |message_set| message_set.messages.map do |message| last_offset = message.offset if last_offset.nil? || last_offset < message.offset if message.offset >= @offset FetchedMessage.new( message: message, topic: @topic, partition: @fetched_partition.partition ) end end end FetchedBatch.new( topic: @topic, partition: @fetched_partition.partition, last_offset: last_offset, highwater_mark_offset: @fetched_partition.highwater_mark_offset, messages: messages ) end
extract_records()
click to toggle source
# File lib/kafka/fetched_batch_generator.rb, line 62 def extract_records records = [] last_offset = nil aborted_transactions = @fetched_partition.aborted_transactions.sort_by(&:first_offset) aborted_producer_ids = {} @fetched_partition.messages.each do |record_batch| last_offset = record_batch.last_offset if last_offset.nil? || last_offset < record_batch.last_offset # Find the list of aborted producer IDs less than current offset unless aborted_transactions.empty? if aborted_transactions.first.first_offset <= record_batch.last_offset aborted_transaction = aborted_transactions.shift aborted_producer_ids[aborted_transaction.producer_id] = aborted_transaction.first_offset end end if abort_marker?(record_batch) # Abort marker, remove the producer from the aborted list aborted_producer_ids.delete(record_batch.producer_id) elsif aborted_producer_ids.key?(record_batch.producer_id) && record_batch.in_transaction # Reject aborted record batch @logger.info("Reject #{record_batch.records.size} aborted records of topic '#{@topic}', partition #{@fetched_partition.partition}, from offset #{record_batch.first_offset}") next end record_batch.records.each do |record| if !record.is_control_record && record.offset >= @offset records << FetchedMessage.new( message: record, topic: @topic, partition: @fetched_partition.partition ) end end end FetchedBatch.new( topic: @topic, partition: @fetched_partition.partition, last_offset: last_offset, highwater_mark_offset: @fetched_partition.highwater_mark_offset, messages: records ) end