class Kafka::Datadog::ProducerSubscriber
Public Instance Methods
ack_message(event)
click to toggle source
# File lib/kafka/datadog.rb, line 331 def ack_message(event) tags = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic), } # Number of messages ACK'd for the topic. increment("producer.ack.messages", tags: tags) # Histogram of delay between a message being produced and it being ACK'd. histogram("producer.ack.delay", event.payload.fetch(:delay), tags: tags) end
buffer_overflow(event)
click to toggle source
# File lib/kafka/datadog.rb, line 300 def buffer_overflow(event) tags = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic), } increment("producer.produce.errors", tags: tags) end
deliver_messages(event)
click to toggle source
# File lib/kafka/datadog.rb, line 309 def deliver_messages(event) client = event.payload.fetch(:client_id) message_count = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) tags = { client: client, } if event.payload.key?(:exception) increment("producer.deliver.errors", tags: tags) end timing("producer.deliver.latency", event.duration, tags: tags) # Messages delivered to Kafka: count("producer.deliver.messages", message_count, tags: tags) # Number of attempts to deliver messages: histogram("producer.deliver.attempts", attempts, tags: tags) end
produce_message(event)
click to toggle source
# File lib/kafka/datadog.rb, line 269 def produce_message(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) message_size = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 tags = { client: client, topic: topic, } # This gets us the write rate. increment("producer.produce.messages", tags: tags.merge(topic: topic)) # Information about typical/average/95p message size. histogram("producer.produce.message_size", message_size, tags: tags.merge(topic: topic)) # Aggregate message size. count("producer.produce.message_size.sum", message_size, tags: tags.merge(topic: topic)) # This gets us the avg/max buffer size per producer. histogram("producer.buffer.size", buffer_size, tags: tags) # This gets us the avg/max buffer fill ratio per producer. histogram("producer.buffer.fill_ratio", buffer_fill_ratio, tags: tags) histogram("producer.buffer.fill_percentage", buffer_fill_percentage, tags: tags) end
topic_error(event)
click to toggle source
# File lib/kafka/datadog.rb, line 344 def topic_error(event) tags = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } increment("producer.ack.errors", tags: tags) end