class Phobos::Actions::ProcessBatch

Attributes

metadata[R]

Public Class Methods

new(listener:, batch:, listener_metadata:) click to toggle source
# File lib/phobos/actions/process_batch.rb, line 10
def initialize(listener:, batch:, listener_metadata:)
  @listener = listener
  @batch = batch
  @listener_metadata = listener_metadata
  @metadata = listener_metadata.merge(
    batch_size: batch.messages.count,
    partition: batch.partition,
    offset_lag: batch.offset_lag
  )
end

Public Instance Methods

execute() click to toggle source
# File lib/phobos/actions/process_batch.rb, line 21
def execute
  instrument('listener.process_batch', @metadata) do |_metadata|
    @batch.messages.each do |message|
      Phobos::Actions::ProcessMessage.new(
        listener: @listener,
        message: message,
        listener_metadata: @listener_metadata
      ).execute
      @listener.consumer.trigger_heartbeat
    end
  end
end