class Kafkat::Command::SetReplicationFactor

Command to set the replication factor (RF) of a topic. The command accepts the topic name, the new desired replication factor, and, in case of an increase of the replication factor, a list of broker ids.

When reducing the RF, a new partition assigment will be generated by removing the last partition replica of every replica set. The leader partition will not be removed from the replica set to prevent a leader election.

When increasing the RF, a new partition assignment will be generated by allocating a new replica to every replica set. The new replica will be assigned to the provided broker list in a round robin fashion. If no broker id is specified on the command line, all brokers will be used.

Public Instance Methods

increase_rf(topic, current_rf, new_rf, brokers) click to toggle source

For every partition, filter out the brokers that already have a replica for this partition, then pick (new_rf - current_rf) brokers and assign them new replicas.

The count of new replicas assigned to the brokers is maintained in order to uniformly assign new replicas.

# File lib/kafkat/command/set-replication-factor.rb, line 135
def increase_rf(topic, current_rf, new_rf, brokers)
  unless new_rf > current_rf
    raise 'New replication factor must be greater than the current replication factor'
  end

  delta_rf = new_rf - current_rf
  if delta_rf > brokers.size
    raise "#{delta_rf} new replicas requested for topic #{p.topic_name} but only #{brokers.size} brokers available"
  end

  broker_counts = brokers.map { |b| {:id => b, :count => 0} }

  assignments = []
  topic.partitions.map do |p|
    existing_replicas = p.replicas
    pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) }
    if delta_rf > pick_from.size
      raise "Cannot create #{delta_rf} new replicas for partition #{p.topic_name}.#{p.id}, not enough brokers"
    end
    new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf]
    new_replicas.each { |b| b[:count] += 1 }

    final_replicas = existing_replicas + new_replicas.map { |b| b[:id] }

    assignments << Assignment.new(topic.name, p.id, final_replicas)
  end
  assignments
end
reduce_rf(topic, current_rf, new_rf) click to toggle source

For every partition, remove the last replica from the replica list. If the last replica is the leader, then the previous replica is removed instead.

# File lib/kafkat/command/set-replication-factor.rb, line 99
def reduce_rf(topic, current_rf, new_rf)
  delta_rf = current_rf - new_rf
  if current_rf == 1
    raise 'Current replication factor if 1. Cannot reduce further.'
  end
  unless delta_rf > 0
    raise "New replication factor (#{new_rf}) must be smaller than current replication factor (#{current_rf})"
  end
  assignments = []
  topic.partitions.map do |p|
    new_replicas = p.replicas

    (0...delta_rf).each do |_|
      (0...new_replicas.size).each do |i|
        if new_replicas[new_replicas.size-1-i] != p.leader
          new_replicas.delete_at(new_replicas.size-1-i)
          break
        end
      end
    end

    if new_replicas.size != new_rf
      raise 'Unexpected state'
    end
    assignments << Assignment.new(topic.name, p.id, new_replicas)
  end
  assignments
end
run() click to toggle source
# File lib/kafkat/command/set-replication-factor.rb, line 25
def run
  topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--')

  all_brokers = zookeeper.get_brokers
  topics = topic_name && zookeeper.get_topics([topic_name])
  topics ||= zookeeper.get_topics

  opts = Trollop.options do
    opt :brokers, "the comma-separated list of broker the new partitions must be assigned to", type: :string
    opt :newrf, "the new replication factor", type: :integer, required: true
  end

  broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i)
  new_rf = opts[:newrf]

  if new_rf < 1
    puts "ERROR: replication factor is smaller than 1"
    exit 1
  end

  broker_ids ||= zookeeper.get_brokers.values.map(&:id)

  all_brokers_id = all_brokers.values.map(&:id)
  broker_ids.each do |id|
    if !all_brokers_id.include?(id)
      puts "ERROR: Broker #{id} is not currently active.\n"
      exit 1
    end
  end

  broker_count = broker_ids.size
  if new_rf > broker_count
    puts "ERROR: Replication factor is larger than number of brokers.\n"
    exit 1
  end

  assignments = []
  topics.each do |_, t|
    current_rf = t.partitions[0].replicas.size
    if new_rf < current_rf
      warn_reduce_brokers if opts[:brokers]
      assignments += reduce_rf(t, current_rf, new_rf)
    elsif new_rf > current_rf
      assignments += increase_rf(t, current_rf, new_rf, broker_ids)
    end
  end

  # ****************
  if assignments.empty?
    puts "No partition reassignment required"
  else
    print "This operation executes the following assignments:\n\n"
    print_assignment_header
    assignments.each { |a| print_assignment(a) }
    print "\n"

    return unless agree("Proceed (y/n)?")

    result = nil
    begin
      print "\nBeginning.\n"
      result = admin.reassign!(assignments)
      print "Started.\n"
    rescue Admin::ExecutionFailedError
      print result
    end
  end
end
warn_reduce_brokers() click to toggle source
# File lib/kafkat/command/set-replication-factor.rb, line 164
def warn_reduce_brokers
  return if @did_warn_reduce_brokers
  puts "When reducing the replication factor the list of specified brokers is ignored."
  puts "Once the replication factor is set, you can use the reassign command."
  @did_warn_reduce_brokers = true
end