class Kafkat::Command::SetReplicationFactor
Command
to set the replication factor (RF) of a topic. The command accepts the topic name, the new desired replication factor, and, in case of an increase of the replication factor, a list of broker ids.
When reducing the RF, a new partition assigment will be generated by removing the last partition replica of every replica set. The leader partition will not be removed from the replica set to prevent a leader election.
When increasing the RF, a new partition assignment will be generated by allocating a new replica to every replica set. The new replica will be assigned to the provided broker list in a round robin fashion. If no broker id is specified on the command line, all brokers will be used.
Public Instance Methods
For every partition, filter out the brokers that already have a replica for this partition, then pick (new_rf - current_rf) brokers and assign them new replicas.
The count of new replicas assigned to the brokers is maintained in order to uniformly assign new replicas.
# File lib/kafkat/command/set-replication-factor.rb, line 135 def increase_rf(topic, current_rf, new_rf, brokers) unless new_rf > current_rf raise 'New replication factor must be greater than the current replication factor' end delta_rf = new_rf - current_rf if delta_rf > brokers.size raise "#{delta_rf} new replicas requested for topic #{p.topic_name} but only #{brokers.size} brokers available" end broker_counts = brokers.map { |b| {:id => b, :count => 0} } assignments = [] topic.partitions.map do |p| existing_replicas = p.replicas pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) } if delta_rf > pick_from.size raise "Cannot create #{delta_rf} new replicas for partition #{p.topic_name}.#{p.id}, not enough brokers" end new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf] new_replicas.each { |b| b[:count] += 1 } final_replicas = existing_replicas + new_replicas.map { |b| b[:id] } assignments << Assignment.new(topic.name, p.id, final_replicas) end assignments end
For every partition, remove the last replica from the replica list. If the last replica is the leader, then the previous replica is removed instead.
# File lib/kafkat/command/set-replication-factor.rb, line 99 def reduce_rf(topic, current_rf, new_rf) delta_rf = current_rf - new_rf if current_rf == 1 raise 'Current replication factor if 1. Cannot reduce further.' end unless delta_rf > 0 raise "New replication factor (#{new_rf}) must be smaller than current replication factor (#{current_rf})" end assignments = [] topic.partitions.map do |p| new_replicas = p.replicas (0...delta_rf).each do |_| (0...new_replicas.size).each do |i| if new_replicas[new_replicas.size-1-i] != p.leader new_replicas.delete_at(new_replicas.size-1-i) break end end end if new_replicas.size != new_rf raise 'Unexpected state' end assignments << Assignment.new(topic.name, p.id, new_replicas) end assignments end
# File lib/kafkat/command/set-replication-factor.rb, line 25 def run topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--') all_brokers = zookeeper.get_brokers topics = topic_name && zookeeper.get_topics([topic_name]) topics ||= zookeeper.get_topics opts = Trollop.options do opt :brokers, "the comma-separated list of broker the new partitions must be assigned to", type: :string opt :newrf, "the new replication factor", type: :integer, required: true end broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i) new_rf = opts[:newrf] if new_rf < 1 puts "ERROR: replication factor is smaller than 1" exit 1 end broker_ids ||= zookeeper.get_brokers.values.map(&:id) all_brokers_id = all_brokers.values.map(&:id) broker_ids.each do |id| if !all_brokers_id.include?(id) puts "ERROR: Broker #{id} is not currently active.\n" exit 1 end end broker_count = broker_ids.size if new_rf > broker_count puts "ERROR: Replication factor is larger than number of brokers.\n" exit 1 end assignments = [] topics.each do |_, t| current_rf = t.partitions[0].replicas.size if new_rf < current_rf warn_reduce_brokers if opts[:brokers] assignments += reduce_rf(t, current_rf, new_rf) elsif new_rf > current_rf assignments += increase_rf(t, current_rf, new_rf, broker_ids) end end # **************** if assignments.empty? puts "No partition reassignment required" else print "This operation executes the following assignments:\n\n" print_assignment_header assignments.each { |a| print_assignment(a) } print "\n" return unless agree("Proceed (y/n)?") result = nil begin print "\nBeginning.\n" result = admin.reassign!(assignments) print "Started.\n" rescue Admin::ExecutionFailedError print result end end end
# File lib/kafkat/command/set-replication-factor.rb, line 164 def warn_reduce_brokers return if @did_warn_reduce_brokers puts "When reducing the replication factor the list of specified brokers is ignored." puts "Once the replication factor is set, you can use the reassign command." @did_warn_reduce_brokers = true end