class Kafkat::ClusterRestart::ClusterRestartHelper
Public Class Methods
calculate_cost(broker_id, partitions, session)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 211 def self.calculate_cost(broker_id, partitions, session) raise UnknownBrokerError, "Unknown broker #{broker_id}" unless session.broker_states.key?(broker_id) partitions.find_all { |partition| partition.leader == broker_id } .reduce(0) do |cost, partition| cost += partition.replicas.length cost -= partition.replicas.find_all { |replica| session.restarted?(replica) }.size cost end end
get_broker_to_leader_partition_mapping(topics)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 201 def self.get_broker_to_leader_partition_mapping(topics) broker_to_partitions = Hash.new { |h, key| h[key] = [] } topics.values.flat_map { |topic| topic.partitions } .each do |partition| broker_to_partitions[partition.leader] << partition end broker_to_partitions end
select_broker_with_min_cost(session, topics)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 187 def self.select_broker_with_min_cost(session, topics) broker_to_partition = get_broker_to_leader_partition_mapping(topics) broker_restart_cost = Hash.new(0) session.broker_states.each do |broker_id, state| if state == Session::STATE_NOT_RESTARTED current_cost = calculate_cost(broker_id, broker_to_partition[broker_id], session) broker_restart_cost[broker_id] = current_cost if current_cost != nil end end # Sort by cost first, and then broker_id broker_restart_cost.min_by { |broker_id, cost| [cost, broker_id] } end