class Kafka::Prometheus::ConsumerSubscriber
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/kafka/prometheus.rb, line 70 def initialize super @process_messages = Prometheus.registry.counter(:consumer_process_messages, docstring: 'Total messages', labels: [:client, :group_id, :topic, :partition]) @process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, docstring: 'Total errors', labels: [:client, :group_id, :topic, :partition]) @process_message_latency = Prometheus.registry.histogram(:consumer_process_message_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition]) @offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, docstring: 'Offset lag', labels: [:client, :group_id, :topic, :partition]) @time_lag = Prometheus.registry.gauge(:consumer_time_lag, docstring: 'Time lag of message', labels: [:client, :group_id, :topic, :partition]) @process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, docstring: 'Total errors in batch', labels: [:client, :group_id, :topic, :partition]) @process_batch_latency = Prometheus.registry.histogram(:consumer_process_batch_latency, docstring: 'Latency in batch', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition]) @batch_size = Prometheus.registry.histogram(:consumer_batch_size, docstring: 'Size of batch', buckets: SIZE_BUCKETS, labels: [:client, :group_id, :topic, :partition]) @join_group = Prometheus.registry.histogram(:consumer_join_group, docstring: 'Time to join group', buckets: DELAY_BUCKETS, labels: [:client, :group_id]) @join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, docstring: 'Total error in joining group', labels: [:client, :group_id]) @sync_group = Prometheus.registry.histogram(:consumer_sync_group, docstring: 'Time to sync group', buckets: DELAY_BUCKETS, labels: [:client, :group_id]) @sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, docstring: 'Total error in syncing group', labels: [:client, :group_id]) @leave_group = Prometheus.registry.histogram(:consumer_leave_group, docstring: 'Time to leave group', buckets: DELAY_BUCKETS, labels: [:client, :group_id]) @leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, docstring: 'Total error in leaving group', labels: [:client, :group_id]) @pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, docstring: 'Pause duration', labels: [:client, :group_id, :topic, :partition]) end
Public Instance Methods
fetch_batch(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 136 def fetch_batch(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } offset_lag = event.payload.fetch(:offset_lag) batch_size = event.payload.fetch(:message_count) @batch_size.observe(batch_size, labels: key) @offset_lag.set(offset_lag, labels: key) end
join_group(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 150 def join_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @join_group.observe(event.duration, labels: key) @join_group_errors.increment(labels: key) if event.payload.key?(:exception) end
leave_group(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 164 def leave_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @leave_group.observe(event.duration, labels: key) @leave_group_errors.increment(labels: key) if event.payload.key?(:exception) end
pause_status(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 171 def pause_status(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } duration = event.payload.fetch(:duration) @pause_duration.set(duration, labels: key) end
process_batch(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 119 def process_batch(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } message_count = event.payload.fetch(:message_count) if event.payload.key?(:exception) @process_batch_errors.increment(labels: key) else @process_batch_latency.observe(event.duration, labels: key) @process_messages.increment(by: message_count, labels: key) end end
process_message(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 91 def process_message(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i if event.payload.key?(:exception) @process_message_errors.increment(labels: key) else @process_message_latency.observe(event.duration, labels: key) @process_messages.increment(labels: key) end @offset_lag.set(offset_lag, labels: key) # Not all messages have timestamps. return unless time_lag @time_lag.set(time_lag, labels: key) end
sync_group(event)
click to toggle source
# File lib/kafka/prometheus.rb, line 157 def sync_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @sync_group.observe(event.duration, labels: key) @sync_group_errors.increment(labels: key) if event.payload.key?(:exception) end