class Deimos::Utils::LagReporter
Class that manages reporting lag.
Public Class Methods
heartbeat(payload)
click to toggle source
@param payload [Hash]
# File lib/deimos/utils/lag_reporter.rb, line 132 def heartbeat(payload) group = payload[:group_id] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) consumer_group = @groups[group.to_s] payload[:topic_partitions].each do |topic, partitions| partitions.each do |partition| consumer_group.report_lag(topic, partition) end end end end
message_processed(payload)
click to toggle source
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) @param payload [Hash]
# File lib/deimos/utils/lag_reporter.rb, line 106 def message_processed(payload) offset = payload[:offset] || payload[:last_offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end
offset_seek(payload)
click to toggle source
@param payload [Hash]
# File lib/deimos/utils/lag_reporter.rb, line 119 def offset_seek(payload) offset = payload[:offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end
reset()
click to toggle source
Reset all group information.
# File lib/deimos/utils/lag_reporter.rb, line 97 def reset @groups = {} end