class Kafka::Protocol::Message
## API Specification
Message => Crc MagicByte Attributes Timestamp Key Value Crc => int32 MagicByte => int8 Attributes => int8 Timestamp => int64, in ms Key => bytes Value => bytes
Constants
- MAGIC_BYTE
Attributes
bytesize[R]
codec_id[R]
create_time[R]
key[R]
offset[R]
value[R]
Public Class Methods
decode(decoder)
click to toggle source
# File lib/kafka/protocol/message.rb, line 66 def self.decode(decoder) offset = decoder.int64 message_decoder = Decoder.from_string(decoder.bytes) _crc = message_decoder.int32 magic_byte = message_decoder.int8 attributes = message_decoder.int8 # The magic byte indicates the message format version. There are situations # where an old message format can be returned from a newer version of Kafka, # because old messages are not necessarily rewritten on upgrades. case magic_byte when 0 # No timestamp in the pre-0.10 message format. timestamp = nil when 1 timestamp = message_decoder.int64 # If the timestamp is set to zero, it's because the message has been upgraded # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't # have a timestamp attribute, so we'll just set the timestamp to nil. timestamp = nil if timestamp.zero? else raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end key = message_decoder.bytes value = message_decoder.bytes # The codec id is encoded in the three least significant bits of the # attributes. codec_id = attributes & 0b111 # The timestamp will be nil if the message was written in the Kafka 0.9 log format. create_time = timestamp && Time.at(timestamp / 1000.0) new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time) end
new(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
click to toggle source
# File lib/kafka/protocol/message.rb, line 26 def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1) @key = key @value = value @codec_id = codec_id @offset = offset @create_time = create_time @bytesize = @key.to_s.bytesize + @value.to_s.bytesize end
Public Instance Methods
==(other)
click to toggle source
# File lib/kafka/protocol/message.rb, line 43 def ==(other) @key == other.key && @value == other.value && @codec_id == other.codec_id && @offset == other.offset end
compressed?()
click to toggle source
# File lib/kafka/protocol/message.rb, line 50 def compressed? @codec_id != 0 end
decompress()
click to toggle source
@return [Array<Kafka::Protocol::Message>]
# File lib/kafka/protocol/message.rb, line 55 def decompress codec = Compression.find_codec_by_id(@codec_id) # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) message_set_decoder = Decoder.from_string(data) message_set = MessageSet.decode(message_set_decoder) correct_offsets(message_set.messages) end
encode(encoder)
click to toggle source
# File lib/kafka/protocol/message.rb, line 36 def encode(encoder) data = encode_with_crc encoder.write_int64(offset) encoder.write_bytes(data) end
headers()
click to toggle source
# File lib/kafka/protocol/message.rb, line 110 def headers {} end
Private Instance Methods
correct_offsets(messages)
click to toggle source
Offsets may be relative with regards to wrapped message offset, but there are special cases.
Cases when client will receive corrected offsets:
- When fetch request is version 0, kafka will correct relative offset on broker side before replying fetch response - When messages is stored in 0.9 format on disk (broker configured to do so).
All other cases, compressed inner messages should have relative offset, with below attributes:
- The container message should have the 'real' offset - The container message's offset should be the 'real' offset of the last message in the compressed batch
# File lib/kafka/protocol/message.rb, line 125 def correct_offsets(messages) max_relative_offset = messages.last.offset # The offsets are already correct, do nothing. return messages if max_relative_offset == offset # The contained messages have relative offsets, and needs to be corrected. base_offset = offset - max_relative_offset messages.map do |message| Message.new( offset: message.offset + base_offset, value: message.value, key: message.key, create_time: message.create_time, codec_id: message.codec_id ) end end
encode_with_crc()
click to toggle source
# File lib/kafka/protocol/message.rb, line 145 def encode_with_crc buffer = StringIO.new encoder = Encoder.new(buffer) data = encode_without_crc crc = Zlib.crc32(data) encoder.write_int32(crc) encoder.write(data) buffer.string end
encode_without_crc()
click to toggle source
# File lib/kafka/protocol/message.rb, line 158 def encode_without_crc buffer = StringIO.new encoder = Encoder.new(buffer) encoder.write_int8(MAGIC_BYTE) encoder.write_int8(@codec_id) encoder.write_int64((@create_time.to_f * 1000).to_i) encoder.write_bytes(@key) encoder.write_bytes(@value) buffer.string end