class Phobos::Actions::ProcessMessage

Attributes

metadata[R]

Public Class Methods

new(listener:, message:, listener_metadata:) click to toggle source
# File lib/phobos/actions/process_message.rb, line 12
def initialize(listener:, message:, listener_metadata:)
  @listener = listener
  @message = message
  @metadata = listener_metadata.merge(
    key: message.key,
    partition: message.partition,
    offset: message.offset,
    retry_count: 0,
    headers: message.headers
  )
end

Public Instance Methods

execute() click to toggle source
# File lib/phobos/actions/process_message.rb, line 24
def execute
  payload = force_encoding(@message.value)

  begin
    process_message(payload)
  rescue StandardError => e
    handle_error(e, 'listener.retry_handler_error',
                 "error processing message, waiting #{backoff_interval}s")
    retry
  end
end

Private Instance Methods

process_message(payload) click to toggle source
# File lib/phobos/actions/process_message.rb, line 38
def process_message(payload)
  instrument('listener.process_message', @metadata) do
    handler = @listener.handler_class.new

    handler.around_consume(payload, @metadata) do |around_payload, around_metadata|
      handler.consume(around_payload, around_metadata)
    end
  end
end