class Kazoo::Partition
Attributes
id[R]
replicas[R]
topic[R]
Public Class Methods
new(topic, id, replicas: nil)
click to toggle source
# File lib/kazoo/partition.rb, line 5 def initialize(topic, id, replicas: nil) @topic, @id, @replicas = topic, id, replicas @mutex = Mutex.new end
Public Instance Methods
cluster()
click to toggle source
# File lib/kazoo/partition.rb, line 10 def cluster topic.cluster end
eql?(other)
click to toggle source
# File lib/kazoo/partition.rb, line 61 def eql?(other) other.kind_of?(Kazoo::Partition) && topic == other.topic && id == other.id end
Also aliased as: ==
hash()
click to toggle source
# File lib/kazoo/partition.rb, line 67 def hash [topic, id].hash end
inspect()
click to toggle source
# File lib/kazoo/partition.rb, line 53 def inspect "#<Kazoo::Partition #{topic.name}/#{id}>" end
isr()
click to toggle source
# File lib/kazoo/partition.rb, line 29 def isr @mutex.synchronize do refresh_state @isr end end
key()
click to toggle source
# File lib/kazoo/partition.rb, line 57 def key "#{topic.name}/#{id}" end
leader()
click to toggle source
# File lib/kazoo/partition.rb, line 22 def leader @mutex.synchronize do refresh_state @leader end end
preferred_leader()
click to toggle source
# File lib/kazoo/partition.rb, line 18 def preferred_leader @replicas.first end
replication_factor()
click to toggle source
# File lib/kazoo/partition.rb, line 14 def replication_factor replicas.length end
to_json(generator)
click to toggle source
# File lib/kazoo/partition.rb, line 84 def to_json(generator) generator.generate(topic: topic.name, partition: id) end
under_replicated?()
click to toggle source
# File lib/kazoo/partition.rb, line 36 def under_replicated? isr.length < replication_factor end
valid?()
click to toggle source
# File lib/kazoo/partition.rb, line 47 def valid? validate rescue Kazoo::ValidationError false end
validate()
click to toggle source
# File lib/kazoo/partition.rb, line 40 def validate raise Kazoo::ValidationError, "No replicas defined for #{topic.name}/#{id}" if replicas.length == 0 raise Kazoo::ValidationError, "The replicas of #{topic.name}/#{id} should be assigned to different brokers" if replicas.length > replicas.uniq.length true end
wait_for_leader()
click to toggle source
# File lib/kazoo/partition.rb, line 71 def wait_for_leader current_leader = nil while current_leader.nil? current_leader = begin leader rescue Kazoo::Error nil end sleep(0.1) if current_leader.nil? end end
Protected Instance Methods
refresh_state()
click to toggle source
# File lib/kazoo/partition.rb, line 90 def refresh_state state_result = cluster.zk.get(path: "/brokers/topics/#{topic.name}/partitions/#{id}/state") raise Kazoo::Error, "Failed to get partition state. Error code: #{state_result.fetch(:rc)}" unless state_result.fetch(:rc) == Zookeeper::Constants::ZOK set_state_from_json(state_result.fetch(:data)) end
set_state_from_json(json_payload)
click to toggle source
# File lib/kazoo/partition.rb, line 97 def set_state_from_json(json_payload) json = JSON.parse(json_payload) raise Kazoo::VersionNotSupported unless json.fetch('version') == 1 begin @leader = cluster.brokers.fetch(json.fetch('leader')) rescue KeyError raise Kazoo::Error, "Topic #{topic.name}:#{id} has unknown leader! Brokers: '#{cluster.brokers.keys.join(',')}'; 'Payload: '#{json_payload.inspect}'" end @isr = json.fetch('isr').map do |r| begin cluster.brokers.fetch(r) rescue KeyError raise Kazoo::Error, "Broker #{r} not in cluster but in ISR list #{json.fetch('isr').inspect}?!" end end end