class Kafka::Prometheus::AsyncProducerSubscriber

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/kafka/prometheus.rb, line 264
def initialize
  super
  @queue_size = Prometheus.registry.histogram(:async_producer_queue_size, docstring: 'Queue size', buckets: SIZE_BUCKETS, labels: [:client, :topic])
  @queue_fill_ratio = Prometheus.registry.histogram(:async_producer_queue_fill_ratio, docstring: 'Queue fill ratio', labels: [:client, :topic])
  @produce_errors = Prometheus.registry.counter(:async_producer_produce_errors, docstring: 'Producer errors', labels: [:client, :topic])
  @dropped_messages = Prometheus.registry.counter(:async_producer_dropped_messages, docstring: 'Dropped messages', labels: [:client])
end

Public Instance Methods

buffer_overflow(event) click to toggle source
# File lib/kafka/prometheus.rb, line 286
def buffer_overflow(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
  @produce_errors.increment(labels: key)
end
drop_messages(event) click to toggle source
# File lib/kafka/prometheus.rb, line 291
def drop_messages(event)
  key = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:message_count)
  @dropped_messages.increment(by: message_count, labels: key)
end
enqueue_message(event) click to toggle source
# File lib/kafka/prometheus.rb, line 272
def enqueue_message(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  queue_size = event.payload.fetch(:queue_size)
  max_queue_size = event.payload.fetch(:max_queue_size)
  queue_fill_ratio = queue_size.to_f / max_queue_size.to_f

  # This gets us the avg/max queue size per producer.
  @queue_size.observe(queue_size, labels: key)

  # This gets us the avg/max queue fill ratio per producer.
  @queue_fill_ratio.observe(queue_fill_ratio, labels: key)
end