class Kafka::Protocol::ListOffsetResponse

A response to a list offset request.

## API Specification

OffsetResponse => [TopicName [PartitionOffsets]]
  ThrottleTimeMS => int32
  PartitionOffsets => Partition ErrorCode Timestamp Offset
  Partition => int32
  ErrorCode => int16
  Timestamp => int64
  Offset => int64

Attributes

topics[R]

Public Class Methods

decode(decoder) click to toggle source
# File lib/kafka/protocol/list_offset_response.rb, line 65
def self.decode(decoder)
  _throttle_time_ms = decoder.int32
  topics = decoder.array do
    name = decoder.string

    partition_offsets = decoder.array do
      PartitionOffsetInfo.new(
        partition: decoder.int32,
        error_code: decoder.int16,
        timestamp: decoder.int64,
        offset: decoder.int64
      )
    end

    TopicOffsetInfo.new(
      name: name,
      partition_offsets: partition_offsets
    )
  end

  new(topics: topics)
end
new(topics:) click to toggle source
# File lib/kafka/protocol/list_offset_response.rb, line 41
def initialize(topics:)
  @topics = topics
end

Public Instance Methods

offset_for(topic, partition) click to toggle source
# File lib/kafka/protocol/list_offset_response.rb, line 45
def offset_for(topic, partition)
  topic_info = @topics.find {|t| t.name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "Unknown topic #{topic}"
  end

  partition_info = topic_info
    .partition_offsets
    .find {|p| p.partition == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "Unknown partition #{topic}/#{partition}"
  end

  Protocol.handle_error(partition_info.error_code)

  partition_info.offset
end