class Kafka::TransactionManager
Constants
- DEFAULT_TRANSACTION_TIMEOUT
- TRANSACTION_RESULT_ABORT
- TRANSACTION_RESULT_COMMIT
Attributes
producer_epoch[R]
producer_id[R]
transactional_id[R]
Public Class Methods
new( cluster:, logger:, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT )
click to toggle source
# File lib/kafka/transaction_manager.rb, line 13 def initialize( cluster:, logger:, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: DEFAULT_TRANSACTION_TIMEOUT ) @cluster = cluster @logger = TaggedLogger.new(logger) @transactional = transactional @transactional_id = transactional_id @transactional_timeout = transactional_timeout @transaction_state = Kafka::TransactionStateMachine.new(logger: logger) @transaction_partitions = {} # If transactional mode is enabled, idempotent must be enabled @idempotent = transactional || idempotent @producer_id = -1 @producer_epoch = 0 @sequences = {} end
Public Instance Methods
abort_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 186 def abort_transaction force_transactional! if @transaction_state.aborting_transaction? @logger.warn("Transaction is being aborted") return end unless @transaction_state.in_transaction? @logger.warn('Aborting transaction that was never opened on brokers') return end @transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION) @logger.info "Aborting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" response = transaction_coordinator.end_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, transaction_result: TRANSACTION_RESULT_ABORT ) Protocol.handle_error(response.error_code) @logger.info "Transaction #{@transactional_id} is aborted, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" complete_transaction nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
add_partitions_to_transaction(topic_partitions)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 94 def add_partitions_to_transaction(topic_partitions) force_transactional! if @transaction_state.uninitialized? raise Kafka::InvalidTxnStateError, 'Transaction is uninitialized' end # Extract newly created partitions new_topic_partitions = {} topic_partitions.each do |topic, partitions| partitions.each do |partition| @transaction_partitions[topic] ||= {} if !@transaction_partitions[topic][partition] new_topic_partitions[topic] ||= [] new_topic_partitions[topic] << partition @logger.info "Adding parition #{topic}/#{partition} to transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" end end end unless new_topic_partitions.empty? response = transaction_coordinator.add_partitions_to_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, topics: new_topic_partitions ) # Update added topic partitions response.errors.each do |tp| tp.partitions.each do |p| Protocol.handle_error(p.error_code) @transaction_partitions[tp.topic] ||= {} @transaction_partitions[tp.topic][p.partition] = true end end end nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
begin_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 139 def begin_transaction force_transactional! raise Kafka::InvalidTxnStateError, 'Transaction has already started' if @transaction_state.in_transaction? raise Kafka::InvalidTxnStateError, 'Transaction is not ready' unless @transaction_state.ready? @transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION) @logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
close()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 267 def close if in_transaction? @logger.warn("Aborting pending transaction ...") abort_transaction elsif @transaction_state.aborting_transaction? || @transaction_state.committing_transaction? @logger.warn("Transaction is finishing. Sleeping until finish!") sleep 5 end end
commit_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 153 def commit_transaction force_transactional! if @transaction_state.committing_transaction? @logger.warn("Transaction is being committed") return end unless @transaction_state.in_transaction? raise Kafka::InvalidTxnStateError, 'Transaction is not valid to commit' end @transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION) @logger.info "Commiting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" response = transaction_coordinator.end_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, transaction_result: TRANSACTION_RESULT_COMMIT ) Protocol.handle_error(response.error_code) @logger.info "Transaction #{@transactional_id} is committed, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" complete_transaction nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
error?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 259 def error? @transaction_state.error? end
idempotent?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 39 def idempotent? @idempotent == true end
in_transaction?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 255 def in_transaction? @transaction_state.in_transaction? end
init_producer_id(force = false)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 47 def init_producer_id(force = false) return if @producer_id >= 0 && !force response = transaction_coordinator.init_producer_id( transactional_id: @transactional_id, transactional_timeout: @transactional_timeout ) Protocol.handle_error(response.error_code) # Reset producer id @producer_id = response.producer_id @producer_epoch = response.producer_epoch # Reset sequence @sequences = {} @logger.debug "Current Producer ID is #{@producer_id} and Producer Epoch is #{@producer_epoch}" end
init_transactions()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 76 def init_transactions force_transactional! unless @transaction_state.uninitialized? @logger.warn("Transaction already initialized!") return end init_producer_id(true) @transaction_partitions = {} @transaction_state.transition_to!(TransactionStateMachine::READY) @logger.info "Transaction #{@transactional_id} is initialized, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
next_sequence_for(topic, partition)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 66 def next_sequence_for(topic, partition) @sequences[topic] ||= {} @sequences[topic][partition] ||= 0 end
ready?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 263 def ready? @transaction_state.ready? end
send_offsets_to_txn(offsets:, group_id:)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 221 def send_offsets_to_txn(offsets:, group_id:) force_transactional! unless @transaction_state.in_transaction? raise Kafka::InvalidTxnStateError, 'Transaction is not valid to send offsets' end add_response = transaction_coordinator.add_offsets_to_txn( transactional_id: @transactional_id, producer_id: @producer_id, producer_epoch: @producer_epoch, group_id: group_id ) Protocol.handle_error(add_response.error_code) send_response = group_coordinator(group_id: group_id).txn_offset_commit( transactional_id: @transactional_id, group_id: group_id, producer_id: @producer_id, producer_epoch: @producer_epoch, offsets: offsets ) send_response.errors.each do |tp| tp.partitions.each do |partition| Protocol.handle_error(partition.error_code) end end nil rescue @transaction_state.transition_to!(TransactionStateMachine::ERROR) raise end
transactional?()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 43 def transactional? @transactional == true && !@transactional_id.nil? end
update_sequence_for(topic, partition, sequence)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 71 def update_sequence_for(topic, partition, sequence) @sequences[topic] ||= {} @sequences[topic][partition] = sequence end
Private Instance Methods
complete_transaction()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 301 def complete_transaction @transaction_state.transition_to!(TransactionStateMachine::READY) @transaction_partitions = {} end
force_transactional!()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 279 def force_transactional! unless transactional? raise Kafka::InvalidTxnStateError, 'Please turn on transactional mode to use transaction' end if @transactional_id.nil? || @transactional_id.empty? raise Kafka::InvalidTxnStateError, 'Please provide a transaction_id to use transactional mode' end end
group_coordinator(group_id:)
click to toggle source
# File lib/kafka/transaction_manager.rb, line 295 def group_coordinator(group_id:) @cluster.get_group_coordinator( group_id: group_id ) end
transaction_coordinator()
click to toggle source
# File lib/kafka/transaction_manager.rb, line 289 def transaction_coordinator @cluster.get_transaction_coordinator( transactional_id: @transactional_id ) end