class Kafka::TransactionManager

Constants

DEFAULT_TRANSACTION_TIMEOUT
TRANSACTION_RESULT_ABORT
TRANSACTION_RESULT_COMMIT

Attributes

producer_epoch[R]
producer_id[R]
transactional_id[R]

Public Class Methods

new( cluster:, logger:, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT ) click to toggle source
# File lib/kafka/transaction_manager.rb, line 13
def initialize(
  cluster:,
  logger:,
  idempotent: false,
  transactional: false,
  transactional_id: nil,
  transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT
)
  @cluster = cluster
  @logger = TaggedLogger.new(logger)

  @transactional = transactional
  @transactional_id = transactional_id
  @transactional_timeout = transactional_timeout
  @transaction_state = Kafka::TransactionStateMachine.new(logger: logger)
  @transaction_partitions = {}

  # If transactional mode is enabled, idempotent must be enabled
  @idempotent = transactional || idempotent

  @producer_id = -1
  @producer_epoch = 0

  @sequences = {}
end

Public Instance Methods

abort_transaction() click to toggle source
# File lib/kafka/transaction_manager.rb, line 186
def abort_transaction
  force_transactional!

  if @transaction_state.aborting_transaction?
    @logger.warn("Transaction is being aborted")
    return
  end

  unless @transaction_state.in_transaction?
    raise 'Transaction is not valid to abort'
  end

  @transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION)

  @logger.info "Aborting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"

  response = transaction_coordinator.end_txn(
    transactional_id: @transactional_id,
    producer_id: @producer_id,
    producer_epoch: @producer_epoch,
    transaction_result: TRANSACTION_RESULT_ABORT
  )
  Protocol.handle_error(response.error_code)

  @logger.info "Transaction #{@transactional_id} is aborted, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"

  complete_transaction

  nil
rescue
  @transaction_state.transition_to!(TransactionStateMachine::ERROR)
  raise
end
add_partitions_to_transaction(topic_partitions) click to toggle source
# File lib/kafka/transaction_manager.rb, line 94
def add_partitions_to_transaction(topic_partitions)
  force_transactional!

  if @transaction_state.uninitialized?
    raise 'Transaction is uninitialized'
  end

  # Extract newly created partitions
  new_topic_partitions = {}
  topic_partitions.each do |topic, partitions|
    partitions.each do |partition|
      @transaction_partitions[topic] ||= {}
      if !@transaction_partitions[topic][partition]
        new_topic_partitions[topic] ||= []
        new_topic_partitions[topic] << partition

        @logger.info "Adding parition #{topic}/#{partition}  to transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
      end
    end
  end

  unless new_topic_partitions.empty?
    response = transaction_coordinator.add_partitions_to_txn(
      transactional_id: @transactional_id,
      producer_id: @producer_id,
      producer_epoch: @producer_epoch,
      topics: new_topic_partitions
    )

    # Update added topic partitions
    response.errors.each do |tp|
      tp.partitions.each do |p|
        Protocol.handle_error(p.error_code)
        @transaction_partitions[tp.topic] ||= {}
        @transaction_partitions[tp.topic][p.partition] = true
      end
    end
  end

  nil
rescue
  @transaction_state.transition_to!(TransactionStateMachine::ERROR)
  raise
end
begin_transaction() click to toggle source
# File lib/kafka/transaction_manager.rb, line 139
def begin_transaction
  force_transactional!
  raise 'Transaction has already started' if @transaction_state.in_transaction?
  raise 'Transaction is not ready' unless @transaction_state.ready?
  @transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION)

  @logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"

  nil
rescue
  @transaction_state.transition_to!(TransactionStateMachine::ERROR)
  raise
end
close() click to toggle source
# File lib/kafka/transaction_manager.rb, line 228
def close
  if in_transaction?
    @logger.warn("Aborting pending transaction ...")
    abort_transaction
  elsif @transaction_state.aborting_transaction? || @transaction_state.committing_transaction?
    @logger.warn("Transaction is finishing. Sleeping until finish!")
    sleep 5
  end
end
commit_transaction() click to toggle source
# File lib/kafka/transaction_manager.rb, line 153
def commit_transaction
  force_transactional!

  if @transaction_state.committing_transaction?
    @logger.warn("Transaction is being committed")
    return
  end

  unless @transaction_state.in_transaction?
    raise 'Transaction is not valid to commit'
  end

  @transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION)

  @logger.info "Commiting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"

  response = transaction_coordinator.end_txn(
    transactional_id: @transactional_id,
    producer_id: @producer_id,
    producer_epoch: @producer_epoch,
    transaction_result: TRANSACTION_RESULT_COMMIT
  )
  Protocol.handle_error(response.error_code)

  @logger.info "Transaction #{@transactional_id} is committed, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
  complete_transaction

  nil
rescue
  @transaction_state.transition_to!(TransactionStateMachine::ERROR)
  raise
end
error?() click to toggle source
# File lib/kafka/transaction_manager.rb, line 224
def error?
  @transaction_state.error?
end
idempotent?() click to toggle source
# File lib/kafka/transaction_manager.rb, line 39
def idempotent?
  @idempotent == true
end
in_transaction?() click to toggle source
# File lib/kafka/transaction_manager.rb, line 220
def in_transaction?
  @transaction_state.in_transaction?
end
init_producer_id(force = false) click to toggle source
# File lib/kafka/transaction_manager.rb, line 47
def init_producer_id(force = false)
  return if @producer_id >= 0 && !force

  response = transaction_coordinator.init_producer_id(
    transactional_id: @transactional_id,
    transactional_timeout: @transactional_timeout
  )
  Protocol.handle_error(response.error_code)

  # Reset producer id
  @producer_id = response.producer_id
  @producer_epoch = response.producer_epoch

  # Reset sequence
  @sequences = {}

  @logger.debug "Current Producer ID is #{@producer_id} and Producer Epoch is #{@producer_epoch}"
end
init_transactions() click to toggle source
# File lib/kafka/transaction_manager.rb, line 76
def init_transactions
  force_transactional!
  unless @transaction_state.uninitialized?
    @logger.warn("Transaction already initialized!")
    return
  end
  init_producer_id(true)
  @transaction_partitions = {}
  @transaction_state.transition_to!(TransactionStateMachine::READY)

  @logger.info "Transaction #{@transactional_id} is initialized, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"

  nil
rescue
  @transaction_state.transition_to!(TransactionStateMachine::ERROR)
  raise
end
next_sequence_for(topic, partition) click to toggle source
# File lib/kafka/transaction_manager.rb, line 66
def next_sequence_for(topic, partition)
  @sequences[topic] ||= {}
  @sequences[topic][partition] ||= 0
end
transactional?() click to toggle source
# File lib/kafka/transaction_manager.rb, line 43
def transactional?
  @transactional == true && !@transactional_id.nil?
end
update_sequence_for(topic, partition, sequence) click to toggle source
# File lib/kafka/transaction_manager.rb, line 71
def update_sequence_for(topic, partition, sequence)
  @sequences[topic] ||= {}
  @sequences[topic][partition] = sequence
end

Private Instance Methods

complete_transaction() click to toggle source
# File lib/kafka/transaction_manager.rb, line 256
def complete_transaction
  @transaction_state.transition_to!(TransactionStateMachine::READY)
  @transaction_partitions = {}
end
force_transactional!() click to toggle source
# File lib/kafka/transaction_manager.rb, line 240
def force_transactional!
  unless transactional?
    raise 'Please turn on transactional mode to use transaction'
  end

  if @transactional_id.nil? || @transactional_id.empty?
    raise 'Please provide a transaction_id to use transactional mode'
  end
end
transaction_coordinator() click to toggle source
# File lib/kafka/transaction_manager.rb, line 250
def transaction_coordinator
  @cluster.get_transaction_coordinator(
    transactional_id: @transactional_id
  )
end