class Kazoo::ReplicaAssigner
Helper class to assign replicas to brokers for new partitions. It tries to e=venly divide the number of leaders and replicas over the brokers, in order to get a comparable load on all the brokers in the cluster.
Attributes
broker_leaders[R]
broker_replicas[R]
cluster[R]
Public Class Methods
new(cluster)
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 9 def initialize(cluster) @cluster = cluster retrieve_initial_counts end
Public Instance Methods
assign(replication_factor)
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 41 def assign(replication_factor) raise Kazoo::ValidationError, "replication_factor should be higher than 0 " if replication_factor <= 0 raise Kazoo::ValidationError, "replication_factor should not be higher than the number of brokers " if replication_factor > brokers.length # Order all brokers by the current number of leaders (ascending). # The first one will be the leader replica leader = @broker_leaders .to_a .sort_by { |pair| [pair[1], pair[0].id] } .map(&:first) .first # Update the current broker replica counts. # The assigned leader replica counts as a leader, but as a replica as well. @broker_leaders[leader] += 1 @broker_replicas[leader] += 1 # To assign the other replcias, we remove the broker that was selected as leader from # the list of brokers, and sort the rest by the number of replicas they are currently hosting. # Then, we take the number of remaining replcias to complete the replication factor. other_replicas = @broker_replicas .to_a .reject { |(key, _)| key == leader } .sort_by { |pair| [pair[1], pair[0].id] } .map(&:first) .take(replication_factor - 1) # Update the current broker replica counts. other_replicas.each { |broker| @broker_replicas[broker] += 1 } [leader].concat(other_replicas) end
brokers()
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 14 def brokers @cluster.brokers end
cluster_leader_count()
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 33 def cluster_leader_count broker_leaders.values.inject(0, &:+) end
cluster_replica_count()
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 37 def cluster_replica_count broker_replicas.values.inject(0, &:+) end
retrieve_initial_counts()
click to toggle source
# File lib/kazoo/replica_assigner.rb, line 18 def retrieve_initial_counts @broker_leaders, @broker_replicas = {}, {} @cluster.brokers.each do |_, broker| @broker_leaders[broker], @broker_replicas[broker] = 0, 0 end cluster.partitions.each do |partition| @broker_leaders[partition.preferred_leader] += 1 partition.replicas.each do |broker| @broker_replicas[broker] += 1 end end end