class Kafka::Prometheus::ProducerSubscriber

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/kafka/prometheus.rb, line 185
def initialize
  super
  @produce_messages = Prometheus.registry.counter(:producer_produced_messages, docstring: 'Produced messages total', labels: [:client, :topic])
  @produce_message_size =
    Prometheus.registry.histogram(:producer_message_size, docstring: 'Message size', buckets: SIZE_BUCKETS, labels: [:client, :topic])
  @buffer_size = Prometheus.registry.histogram(:producer_buffer_size, docstring: 'Buffer size', buckets: SIZE_BUCKETS, labels: [:client])
  @buffer_fill_ratio = Prometheus.registry.histogram(:producer_buffer_fill_ratio, docstring: 'Buffer fill ratio', labels: [:client])
  @buffer_fill_percentage = Prometheus.registry.histogram(:producer_buffer_fill_percentage, docstring: 'Buffer fill percentage', labels: [:client])
  @produce_errors = Prometheus.registry.counter(:producer_produce_errors, docstring: 'Produce errors', labels: [:client, :topic])
  @deliver_errors = Prometheus.registry.counter(:producer_deliver_errors, docstring: 'Deliver error', labels: [:client])
  @deliver_latency =
    Prometheus.registry.histogram(:producer_deliver_latency, docstring: 'Delivery latency', buckets: LATENCY_BUCKETS, labels: [:client])
  @deliver_messages = Prometheus.registry.counter(:producer_deliver_messages, docstring: 'Total count of delivered messages', labels: [:client])
  @deliver_attempts = Prometheus.registry.histogram(:producer_deliver_attempts, docstring: 'Delivery attempts', labels: [:client])
  @ack_messages = Prometheus.registry.counter(:producer_ack_messages, docstring: 'Ack', labels: [:client, :topic])
  @ack_delay = Prometheus.registry.histogram(:producer_ack_delay, docstring: 'Ack delay', buckets: LATENCY_BUCKETS, labels: [:client, :topic])
  @ack_errors = Prometheus.registry.counter(:producer_ack_errors, docstring: 'Ack errors', labels: [:client, :topic])
end

Public Instance Methods

ack_message(event) click to toggle source
# File lib/kafka/prometheus.rb, line 246
def ack_message(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  # Number of messages ACK'd for the topic.
  @ack_messages.increment(labels: key)

  # Histogram of delay between a message being produced and it being ACK'd.
  @ack_delay.observe(event.payload.fetch(:delay), labels: key)
end
buffer_overflow(event) click to toggle source
# File lib/kafka/prometheus.rb, line 226
def buffer_overflow(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
  @produce_errors.increment(labels: key)
end
deliver_messages(event) click to toggle source
# File lib/kafka/prometheus.rb, line 231
def deliver_messages(event)
  key = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  @deliver_errors.increment(labels: key) if event.payload.key?(:exception)
  @deliver_latency.observe(event.duration, labels: key)

  # Messages delivered to Kafka:
  @deliver_messages.increment(by: message_count, labels: key)

  # Number of attempts to deliver messages:
  @deliver_attempts.observe(attempts, labels: key)
end
produce_message(event) click to toggle source
# File lib/kafka/prometheus.rb, line 204
def produce_message(event)
  client = event.payload.fetch(:client_id)
  key = { client: client, topic: event.payload.fetch(:topic) }

  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  @produce_messages.increment(labels: key)
  @produce_message_size.observe(message_size, labels: key)

  # This gets us the avg/max buffer size per producer.
  @buffer_size.observe(buffer_size, labels: { client: client })

  # This gets us the avg/max buffer fill ratio per producer.
  @buffer_fill_ratio.observe(buffer_fill_ratio, labels: { client: client })
  @buffer_fill_percentage.observe(buffer_fill_percentage, labels: { client: client })
end
topic_error(event) click to toggle source
# File lib/kafka/prometheus.rb, line 256
def topic_error(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  @ack_errors.increment(labels: key)
end