class Karafka::Instrumentation::StdoutListener

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow

Constants

USED_LOG_LEVELS

Log levels that we use in this particular listener

Public Instance Methods

on_app_initializing(_event) click to toggle source

Logs info that we're initializing Karafka app @param _event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 108
def on_app_initializing(_event)
  info "Initializing Karafka server #{::Process.pid}"
end
on_app_running(_event) click to toggle source

Logs info that we're running Karafka app @param _event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 114
def on_app_running(_event)
  info "Running Karafka server #{::Process.pid}"
end
on_app_stopping(_event) click to toggle source

Logs info that we're going to stop the Karafka server @param _event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 120
def on_app_stopping(_event)
  # We use a separate thread as logging can't be called from trap context
  Thread.new { info "Stopping Karafka server #{::Process.pid}" }
end
on_app_stopping_error(_event) click to toggle source

Logs an error that Karafka was unable to stop the server gracefully and it had to do a

forced exit

@param _event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 128
def on_app_stopping_error(_event)
  # We use a separate thread as logging can't be called from trap context
  Thread.new { error "Forceful Karafka server #{::Process.pid} stop" }
end
on_backends_inline_process(event) click to toggle source

Logs info about processing of a certain dataset with an inline backend @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 84
def on_backends_inline_process(event)
  count = event[:caller].send(:params_batch).to_a.size
  topic = event[:caller].topic.name
  time = event[:time]
  info "Inline processing of topic #{topic} with #{count} messages took #{time} ms"
end
on_connection_batch_delegator_call(event) click to toggle source

Logs details about incoming batches and with which consumer we will consume them @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 18
      def on_connection_batch_delegator_call(event)
        consumer = event[:consumer]
        topic = consumer.topic.name
        kafka_messages = event[:kafka_batch].messages
        info(
          <<~MSG.chomp.tr("\n", ' ')
            #{kafka_messages.count} messages
            on #{topic} topic
            delegated to #{consumer.class}
          MSG
        )
      end
on_connection_client_fetch_loop_error(event) click to toggle source

Logs errors that are related to the connection itself @param event [Dry::Events::Event] event details including payload @note Karafka will attempt to reconnect, so an error not a fatal

# File lib/karafka/instrumentation/stdout_listener.rb, line 70
def on_connection_client_fetch_loop_error(event)
  error "Client fetch loop error: #{event[:error]}"
end
on_connection_listener_fetch_loop_error(event) click to toggle source

Logs errors that occurred in a listener fetch loop @param event [Dry::Events::Event] event details including payload @note It's an error as we can recover from it not a fatal

# File lib/karafka/instrumentation/stdout_listener.rb, line 63
def on_connection_listener_fetch_loop_error(event)
  error "Listener fetch loop error: #{event[:error]}"
end
on_connection_message_delegator_call(event) click to toggle source

Logs details about incoming message and with which consumer we will consume it @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 33
def on_connection_message_delegator_call(event)
  consumer = event[:consumer]
  topic = consumer.topic.name
  info "1 message on #{topic} topic delegated to #{consumer.class}"
end
on_consumers_responders_respond_with(event) click to toggle source

Logs info about responder usage withing a controller flow @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 99
def on_consumers_responders_respond_with(event)
  calling = event[:caller]
  responder = calling.topic.responder
  data = event[:data]
  info "Responded from #{calling.class} using #{responder} with following data #{data}"
end
on_fetcher_call_error(event) click to toggle source

Logs info about crashed fetcher @param event [Dry::Events::Event] event details including payload @note If this happens, Karafka will shutdown as it means a critical error

in one of the threads
# File lib/karafka/instrumentation/stdout_listener.rb, line 78
def on_fetcher_call_error(event)
  fatal "Fetcher crash due to an error: #{event[:error]}"
end
on_params_params_deserialize(event) click to toggle source

Logs details about each received message value deserialization @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 41
      def on_params_params_deserialize(event)
        # Keep in mind, that a caller here is a param object not a controller,
        # so it returns a topic as a string, not a routing topic
        debug(
          <<~MSG.chomp.tr("\n", ' ')
            Params deserialization for #{event[:caller].metadata.topic} topic
            successful in #{event[:time]} ms
          MSG
        )
      end
on_params_params_deserialize_error(event) click to toggle source

Logs unsuccessful deserialization attempts of incoming data @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 54
def on_params_params_deserialize_error(event)
  topic = event[:caller].metadata.topic
  error = event[:error]
  error "Params deserialization error for #{topic} topic: #{error}"
end
on_process_notice_signal(event) click to toggle source

Logs info about system signals that Karafka received @param event [Dry::Events::Event] event details including payload

# File lib/karafka/instrumentation/stdout_listener.rb, line 93
def on_process_notice_signal(event)
  info "Received #{event[:signal]} system signal"
end