module Deimos::KafkaListener

This module listens to events published by RubyKafka.

Public Class Methods

send_produce_error(event) click to toggle source

Listens for any exceptions that happen during publishing and re-publishes as a Deimos event. @param event [ActiveSupport::Notification]

# File lib/deimos/instrumentation.rb, line 43
def self.send_produce_error(event)
  exception = event.payload[:exception_object]
  return if !exception || !exception.respond_to?(:failed_messages)

  messages = exception.failed_messages
  messages.group_by(&:topic).each do |topic, batch|
    producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
    next if batch.empty? || !producer

    decoder = Deimos.schema_backend(schema: producer.config[:schema],
                                    namespace: producer.config[:namespace])
    payloads = batch.map { |m| decoder.decode(m.value) }

    Deimos.config.metrics&.increment(
      'publish_error',
      tags: %W(topic:#{topic}),
      by: payloads.size
    )
    Deimos.instrument(
      'produce_error',
      producer: producer,
      topic: topic,
      exception_object: exception,
      payloads: payloads
    )
  end
end