class Deimos::Utils::LagReporter::Topic
Topic
which has a hash of partition => last known current offsets
Attributes
consumer_group[RW]
@return [ConsumerGroup]
partition_current_offsets[RW]
@return [Hash<Integer, Integer>]
topic_name[RW]
@return [String]
Public Class Methods
new(topic_name, group)
click to toggle source
@param topic_name
[String] @param group [ConsumerGroup]
# File lib/deimos/utils/lag_reporter.rb, line 52 def initialize(topic_name, group) self.topic_name = topic_name self.consumer_group = group self.partition_current_offsets = {} end
Public Instance Methods
assign_current_offset(partition, offset)
click to toggle source
@param partition [Integer]
# File lib/deimos/utils/lag_reporter.rb, line 59 def assign_current_offset(partition, offset) self.partition_current_offsets[partition.to_i] = offset end
compute_lag(partition, offset)
click to toggle source
@param partition [Integer]
# File lib/deimos/utils/lag_reporter.rb, line 64 def compute_lag(partition, offset) begin client = Phobos.create_kafka_client last_offset = client.last_offset_for(self.topic_name, partition) lag = last_offset - offset rescue StandardError # don't do anything, just wait Deimos.config.logger. debug("Error computing lag for #{self.topic_name}, will retry") end lag || 0 end
report_lag(partition)
click to toggle source
@param partition [Integer]
# File lib/deimos/utils/lag_reporter.rb, line 77 def report_lag(partition) current_offset = self.partition_current_offsets[partition.to_i] return unless current_offset lag = compute_lag(partition, current_offset) group = self.consumer_group.id Deimos.config.logger. debug("Sending lag: #{group}/#{partition}: #{lag}") Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W( consumer_group:#{group} partition:#{partition} topic:#{self.topic_name} )) end