class Kafka::FetchedOffsetResolver
Public Class Methods
new(logger:)
click to toggle source
# File lib/kafka/fetched_offset_resolver.rb, line 5 def initialize(logger:) @logger = TaggedLogger.new(logger) end
Public Instance Methods
resolve!(broker, topics)
click to toggle source
# File lib/kafka/fetched_offset_resolver.rb, line 9 def resolve!(broker, topics) pending_topics = filter_pending_topics(topics) return topics if pending_topics.empty? response = broker.list_offsets(topics: pending_topics) pending_topics.each do |topic, partitions| partitions.each do |options| partition = options.fetch(:partition) resolved_offset = response.offset_for(topic, partition) @logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}" topics[topic][partition][:fetch_offset] = resolved_offset || 0 end end end
Private Instance Methods
filter_pending_topics(topics)
click to toggle source
# File lib/kafka/fetched_offset_resolver.rb, line 29 def filter_pending_topics(topics) pending_topics = {} topics.each do |topic, partitions| partitions.each do |partition, options| offset = options.fetch(:fetch_offset) next if offset >= 0 @logger.debug "Resolving offset `#{offset}` for #{topic}/#{partition}..." pending_topics[topic] ||= [] pending_topics[topic] << { partition: partition, time: offset } end end pending_topics end