class Phobos::Actions::ProcessMessage
Attributes
metadata[R]
Public Class Methods
new(listener:, message:, listener_metadata:)
click to toggle source
# File lib/phobos/actions/process_message.rb, line 12 def initialize(listener:, message:, listener_metadata:) @listener = listener @message = message @metadata = listener_metadata.merge( key: message.key, partition: message.partition, offset: message.offset, retry_count: 0, headers: message.headers ) end
Public Instance Methods
execute()
click to toggle source
# File lib/phobos/actions/process_message.rb, line 24 def execute payload = force_encoding(@message.value) begin process_message(payload) rescue StandardError => e handle_error(e, 'listener.retry_handler_error', "error processing message, waiting #{backoff_interval}s") retry end end
Private Instance Methods
process_message(payload)
click to toggle source
# File lib/phobos/actions/process_message.rb, line 38 def process_message(payload) instrument('listener.process_message', @metadata) do handler = @listener.handler_class.new handler.around_consume(payload, @metadata) do |around_payload, around_metadata| handler.consume(around_payload, around_metadata) end end end