class Kazoo::Partition

Attributes

id[R]
replicas[R]
topic[R]

Public Class Methods

new(topic, id, replicas: nil) click to toggle source
# File lib/kazoo/partition.rb, line 5
def initialize(topic, id, replicas: nil)
  @topic, @id, @replicas = topic, id, replicas
  @mutex = Mutex.new
end

Public Instance Methods

==(other)
Alias for: eql?
cluster() click to toggle source
# File lib/kazoo/partition.rb, line 10
def cluster
  topic.cluster
end
eql?(other) click to toggle source
# File lib/kazoo/partition.rb, line 61
def eql?(other)
  other.kind_of?(Kazoo::Partition) && topic == other.topic && id == other.id
end
Also aliased as: ==
hash() click to toggle source
# File lib/kazoo/partition.rb, line 67
def hash
  [topic, id].hash
end
inspect() click to toggle source
# File lib/kazoo/partition.rb, line 53
def inspect
  "#<Kazoo::Partition #{topic.name}/#{id}>"
end
isr() click to toggle source
# File lib/kazoo/partition.rb, line 29
def isr
  @mutex.synchronize do
    refresh_state
    @isr
  end
end
key() click to toggle source
# File lib/kazoo/partition.rb, line 57
def key
  "#{topic.name}/#{id}"
end
leader() click to toggle source
# File lib/kazoo/partition.rb, line 22
def leader
  @mutex.synchronize do
    refresh_state
    @leader
  end
end
preferred_leader() click to toggle source
# File lib/kazoo/partition.rb, line 18
def preferred_leader
  @replicas.first
end
replication_factor() click to toggle source
# File lib/kazoo/partition.rb, line 14
def replication_factor
  replicas.length
end
to_json(generator) click to toggle source
# File lib/kazoo/partition.rb, line 84
def to_json(generator)
  generator.generate(topic: topic.name, partition: id)
end
under_replicated?() click to toggle source
# File lib/kazoo/partition.rb, line 36
def under_replicated?
  isr.length < replication_factor
end
valid?() click to toggle source
# File lib/kazoo/partition.rb, line 47
def valid?
  validate
rescue Kazoo::ValidationError
  false
end
validate() click to toggle source
# File lib/kazoo/partition.rb, line 40
def validate
  raise Kazoo::ValidationError, "No replicas defined for #{topic.name}/#{id}" if replicas.length == 0
  raise Kazoo::ValidationError, "The replicas of #{topic.name}/#{id} should be assigned to different brokers" if replicas.length > replicas.uniq.length

  true
end
wait_for_leader() click to toggle source
# File lib/kazoo/partition.rb, line 71
def wait_for_leader
  current_leader = nil
  while current_leader.nil?
    current_leader = begin
      leader
    rescue Kazoo::Error
      nil
    end

    sleep(0.1) if current_leader.nil?
  end
end

Protected Instance Methods

refresh_state() click to toggle source
# File lib/kazoo/partition.rb, line 90
def refresh_state
  state_result = cluster.zk.get(path: "/brokers/topics/#{topic.name}/partitions/#{id}/state")
  raise Kazoo::Error, "Failed to get partition state. Error code: #{state_result.fetch(:rc)}" unless state_result.fetch(:rc) == Zookeeper::Constants::ZOK

  set_state_from_json(state_result.fetch(:data))
end
set_state_from_json(json_payload) click to toggle source
# File lib/kazoo/partition.rb, line 97
def set_state_from_json(json_payload)
  json = JSON.parse(json_payload)
  raise Kazoo::VersionNotSupported unless json.fetch('version') == 1

  begin
    @leader = cluster.brokers.fetch(json.fetch('leader'))
  rescue KeyError
    raise Kazoo::Error, "Topic #{topic.name}:#{id} has unknown leader! Brokers: '#{cluster.brokers.keys.join(',')}'; 'Payload: '#{json_payload.inspect}'"
  end

  @isr = json.fetch('isr').map do |r|
    begin
      cluster.brokers.fetch(r)
    rescue KeyError
      raise Kazoo::Error, "Broker #{r} not in cluster but in ISR list #{json.fetch('isr').inspect}?!"
    end
  end
end