class Kafka::Interceptors

Holds a list of interceptors that implement `call` and wraps calls to a chain of custom interceptors.

Public Class Methods

new(interceptors:, logger:) click to toggle source
# File lib/kafka/interceptors.rb, line 7
def initialize(interceptors:, logger:)
  @interceptors = interceptors || []
  @logger = TaggedLogger.new(logger)
end

Public Instance Methods

call(intercepted) click to toggle source

This method is called when the client produces a message or once the batches are fetched. The message returned from the first call is passed to the second interceptor call, and so on in an interceptor chain. This method does not throw exceptions.

@param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or

fetched batch.

@return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch

returned by the last interceptor.
# File lib/kafka/interceptors.rb, line 21
def call(intercepted)
  @interceptors.each do |interceptor|
    begin
      intercepted = interceptor.call(intercepted)
    rescue Exception => e
      @logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end

  intercepted
end