module PhobosDBCheckpoint::Handler

Public Class Methods

included(base) click to toggle source
# File lib/phobos_db_checkpoint/handler.rb, line 8
def self.included(base)
  base.extend(ClassMethods)
end

Public Instance Methods

ack(entity_id, event_time, event_type = nil, event_version = nil) click to toggle source
# File lib/phobos_db_checkpoint/handler.rb, line 12
def ack(entity_id, event_time, event_type = nil, event_version = nil)
  PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version)
end
around_consume(payload, metadata) { || ... } click to toggle source

rubocop:disable Style/RedundantBegin

# File lib/phobos_db_checkpoint/handler.rb, line 22
def around_consume(payload, metadata)
  event = PhobosDBCheckpoint::Event.new(
    topic: metadata[:topic],
    group_id: metadata[:group_id],
    payload: payload
  )

  event_metadata = { checksum: event.checksum }.merge(metadata)

  instrument('db_checkpoint.around_consume', event_metadata) do
    event_exists = instrument('db_checkpoint.event_already_exists_check', event_metadata) { event.exists? }
    if event_exists
      instrument('db_checkpoint.event_already_consumed', event_metadata)
      return
    end

    event_action = instrument('db_checkpoint.event_action', event_metadata) do
      begin
        yield
      rescue StandardError => e
        raise e if retry_consume?(event, event_metadata, e)

        Failure.record(event: event, event_metadata: event_metadata, exception: e)
      end
    end

    case event_action
    when PhobosDBCheckpoint::Ack
      instrument('db_checkpoint.event_acknowledged', event_metadata) do
        event.acknowledge!(event_action)
      end
    else
      instrument('db_checkpoint.event_skipped', event_metadata)
    end
  end
ensure
  # Returns any connections in use by the current thread back to the pool, and also returns
  # connections to the pool cached by threads that are no longer alive.
  ActiveRecord::Base.clear_active_connections!
end
retry_consume?(_event, event_metadata, _exception) click to toggle source
# File lib/phobos_db_checkpoint/handler.rb, line 16
def retry_consume?(_event, event_metadata, _exception)
  return true unless Phobos.config&.db_checkpoint&.max_retries
  event_metadata[:retry_count] < Phobos.config&.db_checkpoint&.max_retries
end