class Kazoo::Cluster

Kazoo::Cluster represents a full Kafka cluster, based on how it is registered in Zookeeper. It allows you the inspect the brokers of the cluster, the topics and partition metadata, and the consumergroups that are registered against the cluster.

Attributes

zookeeper[R]

Public Class Methods

new(zookeeper) click to toggle source
# File lib/kazoo/cluster.rb, line 9
def initialize(zookeeper)
  @zookeeper = zookeeper
  @zk_mutex, @brokers_mutex, @topics_mutex = Mutex.new, Mutex.new, Mutex.new
end

Public Instance Methods

brokers() click to toggle source

Returns a hash of all the brokers in the

# File lib/kazoo/cluster.rb, line 22
def brokers
  @brokers_mutex.synchronize do
    @brokers ||= begin
      brokers = zk.get_children(path: "/brokers/ids")

      if brokers.fetch(:rc) != Zookeeper::Constants::ZOK
        raise NoClusterRegistered, "No Kafka cluster registered on this Zookeeper location."
      end

      result, mutex = {}, Mutex.new
      threads = brokers.fetch(:children).map do |id|
        Thread.new do
          Thread.abort_on_exception = true
          broker_info = zk.get(path: "/brokers/ids/#{id}")
          raise Kazoo::Error, "Failed to retrieve broker info. Error code: #{broker_info.fetch(:rc)}" unless broker_info.fetch(:rc) == Zookeeper::Constants::ZOK

          broker = Kazoo::Broker.from_json(self, id, JSON.parse(broker_info.fetch(:data)))
          mutex.synchronize { result[id.to_i] = broker }
        end
      end
      threads.each(&:join)
      result
    end
  end
end
close() click to toggle source

Closes the zookeeper connection and clears all the local caches.

# File lib/kazoo/cluster.rb, line 121
def close
  zk.close
  @zk = nil
  reset_metadata
end
consumergroup(name) click to toggle source

Returns a Kazoo::Consumergroup instance for a given consumer name.

Note that this doesn't register a new consumer group in Zookeeper; you wil have to call Kazoo::Consumergroup.create to do that.

# File lib/kazoo/cluster.rb, line 60
def consumergroup(name)
  Kazoo::Consumergroup.new(self, name)
end
consumergroups() click to toggle source

Returns a list of consumer groups that are registered against the Kafka cluster.

# File lib/kazoo/cluster.rb, line 49
def consumergroups
  @consumergroups ||= begin
    consumers = zk.get_children(path: "/consumers")
    consumers.fetch(:children).map { |name| Kazoo::Consumergroup.new(self, name) }
  end
end
create_topic(name, partitions: nil, replication_factor: nil, config: nil) click to toggle source

Creates a topic on the Kafka cluster, with the provided number of partitions and replication factor.

# File lib/kazoo/cluster.rb, line 82
def create_topic(name, partitions: nil, replication_factor: nil, config: nil)
  raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0
  raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0

  Kazoo::Topic.create(self, name, partitions: Integer(partitions), replication_factor: Integer(replication_factor), config: config)
end
partitions() click to toggle source

Returns a list of all partitions hosted by the cluster

# File lib/kazoo/cluster.rb, line 90
def partitions
  topics.values.flat_map(&:partitions)
end
preferred_leader_election(partitions: nil) click to toggle source

Triggers a preferred leader elections for the provided list of partitions. If no list of partitions is provided, the preferred leader will be elected for all partitions in the cluster.

# File lib/kazoo/cluster.rb, line 107
def preferred_leader_election(partitions: nil)
  partitions = self.partitions if partitions.nil?
  result = zk.create(path: "/admin/preferred_replica_election", data: JSON.generate(version: 1, partitions: partitions))
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "Another preferred leader election is still in progress"
  else
    raise Kazoo::Error, "Failed to start preferred leadership election. Result code: #{result.fetch(:rc)}"
  end
end
reset_metadata() click to toggle source

Resets the locally cached list of brokers and topics, which will mean they will be fetched freshly from Zookeeper the next time they are requested.

# File lib/kazoo/cluster.rb, line 96
def reset_metadata
  @topics, @brokers, @consumergroups = nil, nil, nil
end
topic(name) click to toggle source

Returns a Kazoo::Topic for a given topic name.

# File lib/kazoo/cluster.rb, line 76
def topic(name)
  Kazoo::Topic.new(self, name)
end
topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) click to toggle source

Returns a hash of all the topics in the Kafka cluster, indexed by the topic name.

# File lib/kazoo/cluster.rb, line 65
def topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS)
  @topics_mutex.synchronize do
    @topics ||= begin
      topics = zk.get_children(path: "/brokers/topics")
      raise Kazoo::Error, "Failed to list topics. Error code: #{topics.fetch(:rc)}" unless topics.fetch(:rc) == Zookeeper::Constants::ZOK
      preload_topics_from_names(topics.fetch(:children), preload: preload)
    end
  end
end
under_replicated?() click to toggle source

Returns true if any of the partitions hosted by the cluster

# File lib/kazoo/cluster.rb, line 101
def under_replicated?
  partitions.any?(&:under_replicated?)
end
zk() click to toggle source

Returns a zookeeper connection

# File lib/kazoo/cluster.rb, line 15
def zk
  @zk_mutex.synchronize do
    @zk ||= Zookeeper.new(zookeeper)
  end
end

Protected Instance Methods

recursive_create(path: nil) click to toggle source

Recursively creates a node in Zookeeper, by recusrively trying to create its parent if it doesn not yet exist.

# File lib/kazoo/cluster.rb, line 131
def recursive_create(path: nil)
  raise ArgumentError, "path is a required argument" if path.nil?

  result = zk.stat(path: path)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return
  when Zookeeper::Constants::ZNONODE
    recursive_create(path: File.dirname(path))
    result = zk.create(path: path)

    case result.fetch(:rc)
    when Zookeeper::Constants::ZOK, Zookeeper::Constants::ZNODEEXISTS
      return
    else
      raise Kazoo::Error, "Failed to create node #{path}. Result code: #{result.fetch(:rc)}"
    end
  else
    raise Kazoo::Error, "Failed to create node #{path}. Result code: #{result.fetch(:rc)}"
  end
end
recursive_delete(path: nil) click to toggle source

Deletes a node and all of its children from Zookeeper.

# File lib/kazoo/cluster.rb, line 154
def recursive_delete(path: nil)
  raise ArgumentError, "path is a required argument" if path.nil?

  result = zk.get_children(path: path)
  raise Kazoo::Error, "Failed to list children of #{path} to delete them. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

  threads = result.fetch(:children).map do |name|
    Thread.new do
      Thread.abort_on_exception = true
      recursive_delete(path: File.join(path, name))
    end
  end
  threads.each(&:join)

  result = zk.delete(path: path)
  raise Kazoo::Error, "Failed to delete node #{path}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK
end

Private Instance Methods

preload_topics_from_names(names, preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) click to toggle source
# File lib/kazoo/cluster.rb, line 174
def preload_topics_from_names(names, preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS)
  result, mutex = {}, Mutex.new
  threads = names.map do |name|
    Thread.new do
      Thread.abort_on_exception = true
      topic = topic(name)
      (preload & Kazoo::Topic::ALL_PRELOAD_METHODS).each { |method| topic.send(method) }
      mutex.synchronize { result[name] = topic }
    end
  end
  threads.each(&:join)
  result
end