class Kafka::Protocol::RecordBatch
Constants
- CODEC_ID_MASK
Masks to extract information from attributes
- IN_TRANSACTION_MASK
- IS_CONTROL_BATCH_MASK
- MAGIC_BYTE
- RECORD_BATCH_OVERHEAD
The size of metadata before the real record data
- TIMESTAMP_TYPE_MASK
Attributes
codec_id[RW]
first_offset[R]
first_sequence[R]
first_timestamp[R]
in_transaction[R]
is_control_batch[R]
last_offset_delta[R]
max_timestamp[R]
partition_leader_epoch[R]
producer_epoch[R]
producer_id[R]
records[R]
Public Class Methods
decode(decoder)
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 151 def self.decode(decoder) first_offset = decoder.int64 record_batch_raw = decoder.bytes record_batch_decoder = Decoder.from_string(record_batch_raw) partition_leader_epoch = record_batch_decoder.int32 # Currently, the magic byte is used to distingush legacy MessageSet and # RecordBatch. Therefore, we don't care about magic byte here yet. _magic_byte = record_batch_decoder.int8 _crc = record_batch_decoder.int32 attributes = record_batch_decoder.int16 codec_id = attributes & CODEC_ID_MASK in_transaction = (attributes & IN_TRANSACTION_MASK) > 0 is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0 log_append_time = (attributes & TIMESTAMP_TYPE_MASK) != 0 last_offset_delta = record_batch_decoder.int32 first_timestamp = Time.at(record_batch_decoder.int64 / 1000) max_timestamp = Time.at(record_batch_decoder.int64 / 1000) producer_id = record_batch_decoder.int64 producer_epoch = record_batch_decoder.int16 first_sequence = record_batch_decoder.int32 records_array_length = record_batch_decoder.int32 records_array_raw = record_batch_decoder.read( record_batch_raw.size - RECORD_BATCH_OVERHEAD ) if codec_id != 0 codec = Compression.find_codec_by_id(codec_id) records_array_raw = codec.decompress(records_array_raw) end records_array_decoder = Decoder.from_string(records_array_raw) records_array = [] until records_array_decoder.eof? record = Record.decode(records_array_decoder) record.offset = first_offset + record.offset_delta record.create_time = log_append_time && max_timestamp ? max_timestamp : first_timestamp + record.timestamp_delta records_array << record end raise InsufficientDataMessage if records_array.length != records_array_length new( records: records_array, first_offset: first_offset, first_timestamp: first_timestamp, partition_leader_epoch: partition_leader_epoch, in_transaction: in_transaction, is_control_batch: is_control_batch, last_offset_delta: last_offset_delta, producer_id: producer_id, producer_epoch: producer_epoch, first_sequence: first_sequence, max_timestamp: max_timestamp ) rescue EOFError raise InsufficientDataMessage, 'Partial trailing record detected!' end
new( records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now )
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 20 def initialize( records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now ) @records = Array(records) @first_offset = first_offset @first_timestamp = first_timestamp @codec_id = codec_id # Records verification @last_offset_delta = last_offset_delta @max_timestamp = max_timestamp # Transaction information @producer_id = producer_id @producer_epoch = producer_epoch @first_sequence = first_sequence @partition_leader_epoch = partition_leader_epoch @in_transaction = in_transaction @is_control_batch = is_control_batch mark_control_record end
Public Instance Methods
==(other)
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 139 def ==(other) records == other.records && first_offset == other.first_offset && partition_leader_epoch == other.partition_leader_epoch && in_transaction == other.in_transaction && is_control_batch == other.is_control_batch && last_offset_delta == other.last_offset_delta && producer_id == other.producer_id && producer_epoch == other.producer_epoch && first_sequence == other.first_sequence end
attributes()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 63 def attributes 0x0000 | @codec_id | (@in_transaction ? IN_TRANSACTION_MASK : 0x0) | (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0) end
compressed?()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 121 def compressed? @codec_id != 0 end
encode(encoder)
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 69 def encode(encoder) encoder.write_int64(@first_offset) record_batch_buffer = StringIO.new record_batch_encoder = Encoder.new(record_batch_buffer) record_batch_encoder.write_int32(@partition_leader_epoch) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body crc = Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) encoder.write_bytes(record_batch_buffer.string) end
encode_record_array()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 112 def encode_record_array buffer = StringIO.new encoder = Encoder.new(buffer) @records.each do |record| record.encode(encoder) end buffer.string end
encode_record_batch_body()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 87 def encode_record_batch_body buffer = StringIO.new encoder = Encoder.new(buffer) encoder.write_int16(attributes) encoder.write_int32(@last_offset_delta) encoder.write_int64((@first_timestamp.to_f * 1000).to_i) encoder.write_int64((@max_timestamp.to_f * 1000).to_i) encoder.write_int64(@producer_id) encoder.write_int16(@producer_epoch) encoder.write_int32(@first_sequence) encoder.write_int32(@records.length) records_array = encode_record_array if compressed? codec = Compression.find_codec_by_id(@codec_id) records_array = codec.compress(records_array) end encoder.write(records_array) buffer.string end
fulfill_relative_data()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 125 def fulfill_relative_data first_record = records.min_by { |record| record.create_time } @first_timestamp = first_record.nil? ? Time.now : first_record.create_time last_record = records.max_by { |record| record.create_time } @max_timestamp = last_record.nil? ? Time.now : last_record.create_time records.each_with_index do |record, index| record.offset_delta = index record.timestamp_delta = (record.create_time - first_timestamp).to_i end @last_offset_delta = records.length - 1 end
last_offset()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 59 def last_offset @first_offset + @last_offset_delta end
mark_control_record()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 214 def mark_control_record if in_transaction && is_control_batch record = @records.first record.is_control_record = true unless record.nil? end end
size()
click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 55 def size @records.size end