class Kafka::Datadog::ConsumerSubscriber
Public Instance Methods
fetch_batch(event)
click to toggle source
# File lib/kafka/datadog.rb, line 196 def fetch_batch(event) lag = event.payload.fetch(:offset_lag) batch_size = event.payload.fetch(:message_count) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } histogram("consumer.batch_size", batch_size, tags: tags) gauge("consumer.lag", lag, tags: tags) end
join_group(event)
click to toggle source
# File lib/kafka/datadog.rb, line 211 def join_group(event) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.join_group", event.duration, tags: tags) if event.payload.key?(:exception) increment("consumer.join_group.errors", tags: tags) end end
leave_group(event)
click to toggle source
# File lib/kafka/datadog.rb, line 237 def leave_group(event) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.leave_group", event.duration, tags: tags) if event.payload.key?(:exception) increment("consumer.leave_group.errors", tags: tags) end end
loop(event)
click to toggle source
# File lib/kafka/datadog.rb, line 250 def loop(event) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } histogram("consumer.loop.duration", event.duration, tags: tags) end
pause_status(event)
click to toggle source
# File lib/kafka/datadog.rb, line 259 def pause_status(event) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } duration = event.payload.fetch(:duration) gauge("consumer.pause.duration", duration, tags: tags) end
process_batch(event)
click to toggle source
# File lib/kafka/datadog.rb, line 169 def process_batch(event) offset = event.payload.fetch(:last_offset) messages = event.payload.fetch(:message_count) create_time = event.payload.fetch(:last_create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_batch.errors", tags: tags) else timing("consumer.process_batch.latency", event.duration, tags: tags) count("consumer.messages", messages, tags: tags) end gauge("consumer.offset", offset, tags: tags) if time_lag gauge("consumer.time_lag", time_lag, tags: tags) end end
process_message(event)
click to toggle source
# File lib/kafka/datadog.rb, line 140 def process_message(event) offset = event.payload.fetch(:offset) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_message.errors", tags: tags) else timing("consumer.process_message.latency", event.duration, tags: tags) increment("consumer.messages", tags: tags) end gauge("consumer.offset", offset, tags: tags) gauge("consumer.lag", offset_lag, tags: tags) # Not all messages have timestamps. if time_lag gauge("consumer.time_lag", time_lag, tags: tags) end end
sync_group(event)
click to toggle source
# File lib/kafka/datadog.rb, line 224 def sync_group(event) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.sync_group", event.duration, tags: tags) if event.payload.key?(:exception) increment("consumer.sync_group.errors", tags: tags) end end