class Kafka::TransactionManager
Constants
- DEFAULT_TRANSACTION_TIMEOUT
- TRANSACTION_RESULT_ABORT
- TRANSACTION_RESULT_COMMIT
Attributes
producer_epoch[R]
producer_id[R]
transactional_id[R]
Public Class Methods
new( cluster:, logger:, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT )
click to toggle source
# File lib/kafka/transaction_manager.rb, line 13 def initialize( cluster:, logger:, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT ) @cluster = cluster @logger = TaggedLogger.new(logger) @transactional = transactional @transactional_id = transactional_id @transactional_timeout = transactional_timeout @transaction_state = Kafka::TransactionStateMachine.new(logger: logger) @transaction_partitions = {} # If transactional mode is enabled, idempotent must be enabled @idempotent = transactional || idempotent @producer_id = -1 @producer_epoch = 0 @sequences = {} end
Public Instance Methods
abort_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 186 def abort_transaction force_transactional! if @transaction_state.aborting_transaction? @logger.warn("Transaction is being aborted") return end unless @transaction_state.in_transaction? raise 'Transaction is not valid to abort' end @transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION) @logger.info "Aborting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" response = transaction_coordinator.end_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, transaction_result: TRANSACTION_RESULT_ABORT ) Protocol.handle_error(response.error_code) @logger.info "Transaction #{@transactional_id} is aborted, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" complete_transaction nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
add_partitions_to_transaction(topic_partitions)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 94 def add_partitions_to_transaction(topic_partitions) force_transactional! if @transaction_state.uninitialized? raise 'Transaction is uninitialized' end # Extract newly created partitions new_topic_partitions = {} topic_partitions.each do |topic, partitions| partitions.each do |partition| @transaction_partitions[topic] ||= {} if !@transaction_partitions[topic][partition] new_topic_partitions[topic] ||= [] new_topic_partitions[topic] << partition @logger.info "Adding parition #{topic}/#{partition} to transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" end end end unless new_topic_partitions.empty? response = transaction_coordinator.add_partitions_to_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, topics: new_topic_partitions ) # Update added topic partitions response.errors.each do |tp| tp.partitions.each do |p| Protocol.handle_error(p.error_code) @transaction_partitions[tp.topic] ||= {} @transaction_partitions[tp.topic][p.partition] = true end end end nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
begin_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 139 def begin_transaction force_transactional! raise 'Transaction has already started' if @transaction_state.in_transaction? raise 'Transaction is not ready' unless @transaction_state.ready? @transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION) @logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
close()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 228 def close if in_transaction? @logger.warn("Aborting pending transaction ...") abort_transaction elsif @transaction_state.aborting_transaction? || @transaction_state.committing_transaction? @logger.warn("Transaction is finishing. Sleeping until finish!") sleep 5 end end
commit_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 153 def commit_transaction force_transactional! if @transaction_state.committing_transaction? @logger.warn("Transaction is being committed") return end unless @transaction_state.in_transaction? raise 'Transaction is not valid to commit' end @transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION) @logger.info "Commiting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" response = transaction_coordinator.end_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, transaction_result: TRANSACTION_RESULT_COMMIT ) Protocol.handle_error(response.error_code) @logger.info "Transaction #{@transactional_id} is committed, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" complete_transaction nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
error?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 224 def error? @transaction_state.error? end
idempotent?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 39 def idempotent? @idempotent == true end
in_transaction?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 220 def in_transaction? @transaction_state.in_transaction? end
init_producer_id(force = false)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 47 def init_producer_id(force = false) return if @producer_id >= 0 && !force response = transaction_coordinator.init_producer_id( transactional_id: @transactional_id, transactional_timeout: @transactional_timeout ) Protocol.handle_error(response.error_code) # Reset producer id @producer_id = response.producer_id @producer_epoch = response.producer_epoch # Reset sequence @sequences = {} @logger.debug "Current Producer ID is #{@producer_id} and Producer Epoch is #{@producer_epoch}" end
init_transactions()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 76 def init_transactions force_transactional! unless @transaction_state.uninitialized? @logger.warn("Transaction already initialized!") return end init_producer_id(true) @transaction_partitions = {} @transaction_state.transition_to!(TransactionStateMachine::READY) @logger.info "Transaction #{@transactional_id} is initialized, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
next_sequence_for(topic, partition)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 66 def next_sequence_for(topic, partition) @sequences[topic] ||= {} @sequences[topic][partition] ||= 0 end
transactional?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 43 def transactional? @transactional == true && !@transactional_id.nil? end
update_sequence_for(topic, partition, sequence)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 71 def update_sequence_for(topic, partition, sequence) @sequences[topic] ||= {} @sequences[topic][partition] = sequence end
Private Instance Methods
complete_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 256 def complete_transaction @transaction_state.transition_to!(TransactionStateMachine::READY) @transaction_partitions = {} end
force_transactional!()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 240 def force_transactional! unless transactional? raise 'Please turn on transactional mode to use transaction' end if @transactional_id.nil? || @transactional_id.empty? raise 'Please provide a transaction_id to use transactional mode' end end
transaction_coordinator()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 250 def transaction_coordinator @cluster.get_transaction_coordinator( transactional_id: @transactional_id ) end