class Deimos::Utils::LagReporter

Class that manages reporting lag.

Public Class Methods

heartbeat(payload) click to toggle source

@param payload [Hash]

# File lib/deimos/utils/lag_reporter.rb, line 132
def heartbeat(payload)
  group = payload[:group_id]
  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    consumer_group = @groups[group.to_s]
    payload[:topic_partitions].each do |topic, partitions|
      partitions.each do |partition|
        consumer_group.report_lag(topic, partition)
      end
    end
  end
end
message_processed(payload) click to toggle source

offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) @param payload [Hash]

# File lib/deimos/utils/lag_reporter.rb, line 106
def message_processed(payload)
  offset = payload[:offset] || payload[:last_offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end
offset_seek(payload) click to toggle source

@param payload [Hash]

# File lib/deimos/utils/lag_reporter.rb, line 119
def offset_seek(payload)
  offset = payload[:offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end
reset() click to toggle source

Reset all group information.

# File lib/deimos/utils/lag_reporter.rb, line 97
def reset
  @groups = {}
end