class Kafka::Protocol::FetchResponse

A response to a fetch request.

## API Specification

FetchResponse => ThrottleTimeMS [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset [AbortedTransaction] Records]]
  ThrottleTimeMS => int32
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  LastStableOffset => int64
  MessageSetSize => int32
  AbortedTransaction => [
        ProducerId => int64
        FirstOffset => int64
  ]

Constants

MAGIC_BYTE_LENGTH
MAGIC_BYTE_OFFSET

Attributes

topics[R]

Public Class Methods

decode(decoder) click to toggle source
# File lib/kafka/protocol/fetch_response.rb, line 69
def self.decode(decoder)
  throttle_time_ms = decoder.int32

  topics = decoder.array do
    topic_name = decoder.string

    partitions = decoder.array do
      partition = decoder.int32
      error_code = decoder.int16
      highwater_mark_offset = decoder.int64
      last_stable_offset = decoder.int64

      aborted_transactions = decoder.array do
        producer_id = decoder.int64
        first_offset = decoder.int64
        AbortedTransaction.new(
          producer_id: producer_id,
          first_offset: first_offset
        )
      end

      messages_raw = decoder.bytes
      messages = []

      if !messages_raw.nil? && !messages_raw.empty?
        messages_decoder = Decoder.from_string(messages_raw)

        magic_byte = messages_decoder.peek(MAGIC_BYTE_OFFSET, MAGIC_BYTE_LENGTH)[0].to_i
        if magic_byte == RecordBatch::MAGIC_BYTE
          until messages_decoder.eof?
            begin
              record_batch = RecordBatch.decode(messages_decoder)
              messages << record_batch
            rescue InsufficientDataMessage
              if messages.length > 0
                break
              else
                raise
              end
            end
          end
        else
          message_set = MessageSet.decode(messages_decoder)
          messages << message_set
        end
      end

      FetchedPartition.new(
        partition: partition,
        error_code: error_code,
        highwater_mark_offset: highwater_mark_offset,
        last_stable_offset: last_stable_offset,
        aborted_transactions: aborted_transactions,
        messages: messages
      )
    end

    FetchedTopic.new(
      name: topic_name,
      partitions: partitions,
    )
  end

  new(topics: topics, throttle_time_ms: throttle_time_ms)
end
new(topics: [], throttle_time_ms: 0) click to toggle source
# File lib/kafka/protocol/fetch_response.rb, line 64
def initialize(topics: [], throttle_time_ms: 0)
  @topics = topics
  @throttle_time_ms = throttle_time_ms
end