class Phobos::Actions::ProcessBatchInline

Attributes

metadata[R]

Public Class Methods

new(listener:, batch:, metadata:) click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 13
def initialize(listener:, batch:, metadata:)
  @listener = listener
  @batch = batch
  @listener = listener
  @batch = batch
  @metadata = metadata.merge(
    batch_size: batch.messages.count,
    partition: batch.partition,
    offset_lag: batch.offset_lag,
    retry_count: 0
  )
end

Public Instance Methods

execute() click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 26
def execute
  batch = @batch.messages.map { |message| instantiate_batch_message(message) }

  begin
    process_batch(batch)
  rescue StandardError => e
    handle_error(e, 'listener.retry_handler_error_batch',
                 "error processing inline batch, waiting #{backoff_interval}s")
    retry
  end
end

Private Instance Methods

instantiate_batch_message(message) click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 40
def instantiate_batch_message(message)
  Phobos::BatchMessage.new(
    key: message.key,
    partition: message.partition,
    offset: message.offset,
    payload: force_encoding(message.value),
    headers: message.headers
  )
end
process_batch(batch) click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 50
def process_batch(batch)
  instrument('listener.process_batch_inline', @metadata) do |_metadata|
    handler = @listener.handler_class.new

    handler.around_consume_batch(batch, @metadata) do |around_batch, around_metadata|
      handler.consume_batch(around_batch, around_metadata)
    end
  end
end