class Kafka::Protocol::FetchResponse
A response to a fetch request.
## API Specification
FetchResponse => ThrottleTimeMS [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset [AbortedTransaction] Records]] ThrottleTimeMS => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 LastStableOffset => int64 MessageSetSize => int32 AbortedTransaction => [ ProducerId => int64 FirstOffset => int64 ]
Constants
- MAGIC_BYTE_LENGTH
- MAGIC_BYTE_OFFSET
Attributes
topics[R]
Public Class Methods
decode(decoder)
click to toggle source
# File lib/kafka/protocol/fetch_response.rb, line 69 def self.decode(decoder) throttle_time_ms = decoder.int32 topics = decoder.array do topic_name = decoder.string partitions = decoder.array do partition = decoder.int32 error_code = decoder.int16 highwater_mark_offset = decoder.int64 last_stable_offset = decoder.int64 aborted_transactions = decoder.array do producer_id = decoder.int64 first_offset = decoder.int64 AbortedTransaction.new( producer_id: producer_id, first_offset: first_offset ) end messages_raw = decoder.bytes messages = [] if !messages_raw.nil? && !messages_raw.empty? messages_decoder = Decoder.from_string(messages_raw) magic_byte = messages_decoder.peek(MAGIC_BYTE_OFFSET, MAGIC_BYTE_LENGTH)[0].to_i if magic_byte == RecordBatch::MAGIC_BYTE until messages_decoder.eof? begin record_batch = RecordBatch.decode(messages_decoder) messages << record_batch rescue InsufficientDataMessage if messages.length > 0 break else raise end end end else message_set = MessageSet.decode(messages_decoder) messages << message_set end end FetchedPartition.new( partition: partition, error_code: error_code, highwater_mark_offset: highwater_mark_offset, last_stable_offset: last_stable_offset, aborted_transactions: aborted_transactions, messages: messages ) end FetchedTopic.new( name: topic_name, partitions: partitions, ) end new(topics: topics, throttle_time_ms: throttle_time_ms) end
new(topics: [], throttle_time_ms: 0)
click to toggle source
# File lib/kafka/protocol/fetch_response.rb, line 64 def initialize(topics: [], throttle_time_ms: 0) @topics = topics @throttle_time_ms = throttle_time_ms end