class Kafka::Protocol::OffsetFetchResponse
Attributes
topics[R]
Public Class Methods
decode(decoder)
click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 33 def self.decode(decoder) topics = decoder.array { topic = decoder.string partitions = decoder.array { partition = decoder.int32 info = PartitionOffsetInfo.new( offset: decoder.int64, metadata: decoder.string, error_code: decoder.int16, ) [partition, info] } [topic, Hash[partitions]] } new(topics: Hash[topics]) end
new(topics:)
click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 18 def initialize(topics:) @topics = topics end
Public Instance Methods
offset_for(topic, partition)
click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 22 def offset_for(topic, partition) offset_info = topics.fetch(topic).fetch(partition, nil) if offset_info Protocol.handle_error(offset_info.error_code) offset_info.offset else -1 end end