class Kafka::Protocol::RecordBatch

Constants

CODEC_ID_MASK

Masks to extract information from attributes

IN_TRANSACTION_MASK
IS_CONTROL_BATCH_MASK
MAGIC_BYTE
RECORD_BATCH_OVERHEAD

The size of metadata before the real record data

TIMESTAMP_TYPE_MASK

Attributes

codec_id[RW]
first_offset[R]
first_sequence[R]
first_timestamp[R]
in_transaction[R]
is_control_batch[R]
last_offset_delta[R]
max_timestamp[R]
partition_leader_epoch[R]
producer_epoch[R]
producer_id[R]
records[R]

Public Class Methods

decode(decoder) click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 151
def self.decode(decoder)
  first_offset = decoder.int64

  record_batch_raw = decoder.bytes
  record_batch_decoder = Decoder.from_string(record_batch_raw)

  partition_leader_epoch = record_batch_decoder.int32
  # Currently, the magic byte is used to distingush legacy MessageSet and
  # RecordBatch. Therefore, we don't care about magic byte here yet.
  _magic_byte = record_batch_decoder.int8
  _crc = record_batch_decoder.int32

  attributes = record_batch_decoder.int16
  codec_id = attributes & CODEC_ID_MASK
  in_transaction = (attributes & IN_TRANSACTION_MASK) > 0
  is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0
  log_append_time = (attributes & TIMESTAMP_TYPE_MASK) != 0

  last_offset_delta = record_batch_decoder.int32
  first_timestamp = Time.at(record_batch_decoder.int64 / 1000)
  max_timestamp = Time.at(record_batch_decoder.int64 / 1000)

  producer_id = record_batch_decoder.int64
  producer_epoch = record_batch_decoder.int16
  first_sequence = record_batch_decoder.int32

  records_array_length = record_batch_decoder.int32
  records_array_raw = record_batch_decoder.read(
    record_batch_raw.size - RECORD_BATCH_OVERHEAD
  )
  if codec_id != 0
    codec = Compression.find_codec_by_id(codec_id)
    records_array_raw = codec.decompress(records_array_raw)
  end

  records_array_decoder = Decoder.from_string(records_array_raw)
  records_array = []
  until records_array_decoder.eof?
    record = Record.decode(records_array_decoder)
    record.offset = first_offset + record.offset_delta
    record.create_time = log_append_time && max_timestamp ? max_timestamp : first_timestamp + record.timestamp_delta
    records_array << record
  end

  raise InsufficientDataMessage if records_array.length != records_array_length

  new(
    records: records_array,
    first_offset: first_offset,
    first_timestamp: first_timestamp,
    partition_leader_epoch: partition_leader_epoch,
    in_transaction: in_transaction,
    is_control_batch: is_control_batch,
    last_offset_delta: last_offset_delta,
    producer_id: producer_id,
    producer_epoch: producer_epoch,
    first_sequence: first_sequence,
    max_timestamp: max_timestamp
  )
rescue EOFError
  raise InsufficientDataMessage, 'Partial trailing record detected!'
end
new( records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now ) click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 20
def initialize(
    records: [],
    first_offset: 0,
    first_timestamp: Time.now,
    partition_leader_epoch: 0,
    codec_id: 0,
    in_transaction: false,
    is_control_batch: false,
    last_offset_delta: 0,
    producer_id: -1,
    producer_epoch: 0,
    first_sequence: 0,
    max_timestamp: Time.now
)
  @records = Array(records)
  @first_offset = first_offset
  @first_timestamp = first_timestamp
  @codec_id = codec_id

  # Records verification
  @last_offset_delta = last_offset_delta
  @max_timestamp = max_timestamp

  # Transaction information
  @producer_id = producer_id
  @producer_epoch = producer_epoch

  @first_sequence = first_sequence
  @partition_leader_epoch = partition_leader_epoch
  @in_transaction = in_transaction
  @is_control_batch = is_control_batch

  mark_control_record
end

Public Instance Methods

==(other) click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 139
def ==(other)
  records == other.records &&
    first_offset == other.first_offset &&
    partition_leader_epoch == other.partition_leader_epoch &&
    in_transaction == other.in_transaction &&
    is_control_batch == other.is_control_batch &&
    last_offset_delta == other.last_offset_delta &&
    producer_id == other.producer_id &&
    producer_epoch == other.producer_epoch &&
    first_sequence == other.first_sequence
end
attributes() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 63
def attributes
  0x0000 | @codec_id |
    (@in_transaction ? IN_TRANSACTION_MASK : 0x0) |
    (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0)
end
compressed?() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 121
def compressed?
  @codec_id != 0
end
encode(encoder) click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 69
def encode(encoder)
  encoder.write_int64(@first_offset)

  record_batch_buffer = StringIO.new
  record_batch_encoder = Encoder.new(record_batch_buffer)

  record_batch_encoder.write_int32(@partition_leader_epoch)
  record_batch_encoder.write_int8(MAGIC_BYTE)

  body = encode_record_batch_body
  crc = Digest::CRC32c.checksum(body)

  record_batch_encoder.write_int32(crc)
  record_batch_encoder.write(body)

  encoder.write_bytes(record_batch_buffer.string)
end
encode_record_array() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 112
def encode_record_array
  buffer = StringIO.new
  encoder = Encoder.new(buffer)
  @records.each do |record|
    record.encode(encoder)
  end
  buffer.string
end
encode_record_batch_body() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 87
def encode_record_batch_body
  buffer = StringIO.new
  encoder = Encoder.new(buffer)

  encoder.write_int16(attributes)
  encoder.write_int32(@last_offset_delta)
  encoder.write_int64((@first_timestamp.to_f * 1000).to_i)
  encoder.write_int64((@max_timestamp.to_f * 1000).to_i)

  encoder.write_int64(@producer_id)
  encoder.write_int16(@producer_epoch)
  encoder.write_int32(@first_sequence)

  encoder.write_int32(@records.length)

  records_array = encode_record_array
  if compressed?
    codec = Compression.find_codec_by_id(@codec_id)
    records_array = codec.compress(records_array)
  end
  encoder.write(records_array)

  buffer.string
end
fulfill_relative_data() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 125
def fulfill_relative_data
  first_record = records.min_by { |record| record.create_time }
  @first_timestamp = first_record.nil? ? Time.now : first_record.create_time

  last_record = records.max_by { |record| record.create_time }
  @max_timestamp = last_record.nil? ? Time.now : last_record.create_time

  records.each_with_index do |record, index|
    record.offset_delta = index
    record.timestamp_delta = (record.create_time - first_timestamp).to_i
  end
  @last_offset_delta = records.length - 1
end
last_offset() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 59
def last_offset
  @first_offset + @last_offset_delta
end
mark_control_record() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 214
def mark_control_record
  if in_transaction && is_control_batch
    record = @records.first
    record.is_control_record = true unless record.nil?
  end
end
size() click to toggle source
# File lib/kafka/protocol/record_batch.rb, line 55
def size
  @records.size
end