class Deimos::Utils::LagReporter::Topic

Topic which has a hash of partition => last known current offsets

Attributes

consumer_group[RW]

@return [ConsumerGroup]

partition_current_offsets[RW]

@return [Hash<Integer, Integer>]

topic_name[RW]

@return [String]

Public Class Methods

new(topic_name, group) click to toggle source

@param topic_name [String] @param group [ConsumerGroup]

# File lib/deimos/utils/lag_reporter.rb, line 52
def initialize(topic_name, group)
  self.topic_name = topic_name
  self.consumer_group = group
  self.partition_current_offsets = {}
end

Public Instance Methods

assign_current_offset(partition, offset) click to toggle source

@param partition [Integer]

# File lib/deimos/utils/lag_reporter.rb, line 59
def assign_current_offset(partition, offset)
  self.partition_current_offsets[partition.to_i] = offset
end
compute_lag(partition, offset) click to toggle source

@param partition [Integer]

# File lib/deimos/utils/lag_reporter.rb, line 64
def compute_lag(partition, offset)
  begin
    client = Phobos.create_kafka_client
    last_offset = client.last_offset_for(self.topic_name, partition)
    lag = last_offset - offset
  rescue StandardError # don't do anything, just wait
    Deimos.config.logger.
      debug("Error computing lag for #{self.topic_name}, will retry")
  end
  lag || 0
end
report_lag(partition) click to toggle source

@param partition [Integer]

# File lib/deimos/utils/lag_reporter.rb, line 77
def report_lag(partition)
  current_offset = self.partition_current_offsets[partition.to_i]
  return unless current_offset

  lag = compute_lag(partition, current_offset)
  group = self.consumer_group.id
  Deimos.config.logger.
    debug("Sending lag: #{group}/#{partition}: #{lag}")
  Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W(
                                 consumer_group:#{group}
                                 partition:#{partition}
                                 topic:#{self.topic_name}
                               ))
end