class Kazoo::Topic

Constants

ALL_PRELOAD_METHODS
BLACKLISTED_TOPIC_NAMES
DEFAULT_PRELOAD_METHODS
VALID_TOPIC_NAMES

Attributes

cluster[R]
name[R]

Public Class Methods

create(cluster, name, partitions: nil, replication_factor: nil, config: {}) click to toggle source
# File lib/kazoo/topic.rb, line 250
def self.create(cluster, name, partitions: nil, replication_factor: nil, config: {})
  topic = new(cluster, name, config: config, partitions: [])
  raise Kazoo::Error, "Topic #{name} already exists" if topic.exists?

  replica_assigner = Kazoo::ReplicaAssigner.new(cluster)

  partitions.times do
    replicas = replica_assigner.assign(replication_factor)
    topic.append_partition(replicas: replicas)
  end

  topic.save
  topic
end
new(cluster, name, config: nil, partitions: nil) click to toggle source
# File lib/kazoo/topic.rb, line 10
def initialize(cluster, name, config: nil, partitions: nil)
  @cluster, @name = cluster, name

  self.partitions = partitions
  self.config = config
end

Public Instance Methods

==(other)
Alias for: eql?
add_partitions(partitions: nil, replication_factor: nil) click to toggle source
# File lib/kazoo/topic.rb, line 78
def add_partitions(partitions: nil, replication_factor: nil)
  raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0
  raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0

  raise Kazoo::TopicNotFound, "The topic #{name} does not exists!" unless exists?

  replica_assigner = Kazoo::ReplicaAssigner.new(cluster)

  partitions.times do
    replicas = replica_assigner.assign(replication_factor)
    append_partition(replicas: replicas)
  end

  validate
  write_partitions_to_zookeeper
  wait_for_partitions
  cluster.reset_metadata
end
append_partition(**kwargs) click to toggle source
# File lib/kazoo/topic.rb, line 29
def append_partition(**kwargs)
  new_partition = partition(partitions.length, **kwargs)
  partitions << new_partition
  new_partition
end
config() click to toggle source
# File lib/kazoo/topic.rb, line 143
def config
  @config ||= load_config_from_zookeeper
end
config=(hash) click to toggle source
# File lib/kazoo/topic.rb, line 147
def config=(hash)
  return if hash.nil?
  @config = hash.inject({}) { |h, (k, v)| h[k.to_s] = v.to_s; h }
end
delete_config(key) click to toggle source
# File lib/kazoo/topic.rb, line 157
def delete_config(key)
  config.delete(key.to_s)
  write_config_to_zookeeper
end
destroy() click to toggle source
# File lib/kazoo/topic.rb, line 108
def destroy
  t = Thread.current
  cb = Zookeeper::Callbacks::WatcherCallback.create do |event|
    case event.type
    when Zookeeper::Constants::ZOO_DELETED_EVENT
      t.run if t.status == 'sleep'
    else
      raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}"
    end
  end

  result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::TopicNotFound, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to monitor topic"
  end


  result = cluster.zk.create(path: "/admin/delete_topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    Thread.stop unless cb.completed?
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "The topic #{name} is already marked for deletion!"
  else
    raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}"
  end

  cluster.reset_metadata
end
eql?(other) click to toggle source
# File lib/kazoo/topic.rb, line 47
def eql?(other)
  other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name
end
Also aliased as: ==
exists?() click to toggle source
# File lib/kazoo/topic.rb, line 57
def exists?
  stat = cluster.zk.stat(path: "/brokers/topics/#{name}")
  stat.fetch(:stat).exists?
end
hash() click to toggle source
# File lib/kazoo/topic.rb, line 53
def hash
  [cluster, name].hash
end
inspect() click to toggle source
# File lib/kazoo/topic.rb, line 43
def inspect
  "#<Kazoo::Topic #{name}>"
end
load_config_from_zookeeper() click to toggle source
# File lib/kazoo/topic.rb, line 195
def load_config_from_zookeeper
  result = cluster.zk.get(path: "/config/topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    return {}
  else
    raise Kazoo::Error, "Failed to retrieve topic config"
  end

  set_config_from_json(result.fetch(:data)).config
end
load_partitions_from_zookeeper() click to toggle source
# File lib/kazoo/topic.rb, line 189
def load_partitions_from_zookeeper
  result = cluster.zk.get(path: "/brokers/topics/#{name}")
  raise Kazoo::Error, "Failed to get list of partitions for #{name}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK
  set_partitions_from_json(result.fetch(:data)).partitions
end
partition(index, **kwargs) click to toggle source
# File lib/kazoo/topic.rb, line 25
def partition(index, **kwargs)
  Kazoo::Partition.new(self, index, **kwargs)
end
partitions() click to toggle source
# File lib/kazoo/topic.rb, line 17
def partitions
  @partitions ||= load_partitions_from_zookeeper
end
partitions=(ps) click to toggle source
# File lib/kazoo/topic.rb, line 21
def partitions=(ps)
  @partitions = ps
end
replication_factor() click to toggle source
# File lib/kazoo/topic.rb, line 35
def replication_factor
  partitions.map(&:replication_factor).min
end
reset_default_config() click to toggle source
# File lib/kazoo/topic.rb, line 162
def reset_default_config
  @config = {}
  write_config_to_zookeeper
end
save() click to toggle source
# File lib/kazoo/topic.rb, line 97
def save
  raise Kazoo::Error, "The topic #{name} already exists!" if exists?
  validate

  write_config_to_zookeeper
  write_partitions_to_zookeeper

  wait_for_partitions
  cluster.reset_metadata
end
set_config(key, value) click to toggle source
# File lib/kazoo/topic.rb, line 152
def set_config(key, value)
  config[key.to_s] = value.to_s
  write_config_to_zookeeper
end
set_config_from_json(json_payload) click to toggle source
# File lib/kazoo/topic.rb, line 180
def set_config_from_json(json_payload)
  config_json = JSON.parse(json_payload)
  raise Kazoo::VersionNotSupported if config_json.fetch('version') != 1

  @config = config_json.fetch('config')

  self
end
set_partitions_from_json(json_payload) click to toggle source
# File lib/kazoo/topic.rb, line 167
def set_partitions_from_json(json_payload)
  partition_json = JSON.parse(json_payload)
  raise Kazoo::VersionNotSupported if partition_json.fetch('version') != 1

  @partitions = partition_json.fetch('partitions').map do |(id, replicas)|
    partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
  end

  @partitions.sort_by!(&:id)

  self
end
under_replicated?() click to toggle source
# File lib/kazoo/topic.rb, line 39
def under_replicated?
  partitions.any?(&:under_replicated?)
end
valid?() click to toggle source
# File lib/kazoo/topic.rb, line 72
def valid?
  validate
rescue Kazoo::ValidationError
  false
end
validate() click to toggle source
# File lib/kazoo/topic.rb, line 62
def validate
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name
  raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255
  raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0
  partitions.each(&:validate)

  true
end
write_config_to_zookeeper() click to toggle source
# File lib/kazoo/topic.rb, line 227
def write_config_to_zookeeper
  config_hash = config.inject({}) { |h, (k,v)| h[k.to_s] = v.to_s; h }
  config_json = JSON.generate(version: 1, config: config_hash)

  # Set topic config
  result = cluster.zk.set(path: "/config/topics/#{name}", data: config_json)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    result = cluster.zk.create(path: "/config/topics/#{name}", data: config_json)
    raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK
  else
    raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}"
  end

  # Set config change notification
  result = cluster.zk.create(path: "/config/changes/config_change_", data: name.inspect, sequence: true)
  raise Kazoo::Error, "Failed to set topic config change notification" unless result.fetch(:rc) == Zookeeper::Constants::ZOK

  cluster.reset_metadata
end
write_partitions_to_zookeeper() click to toggle source
# File lib/kazoo/topic.rb, line 209
def write_partitions_to_zookeeper
  path = "/brokers/topics/#{name}"
  data = JSON.generate(version: 1, partitions: partitions_as_json)

  result = cluster.zk.set(path: path, data: data)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    result = cluster.zk.create(path: path, data: data)
    raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK
  else
    raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}"
  end

  cluster.reset_metadata
end

Protected Instance Methods

partitions_as_json() click to toggle source
# File lib/kazoo/topic.rb, line 278
def partitions_as_json
  partitions.inject({}) do |hash, partition|
    hash[partition.id] = partition.replicas.map(&:id)
    hash
  end
end
wait_for_partitions() click to toggle source
# File lib/kazoo/topic.rb, line 267
def wait_for_partitions
  threads = []
  partitions.each do |partition|
    threads << Thread.new do
      Thread.abort_on_exception = true
      partition.wait_for_leader
    end
  end
  threads.each(&:join)
end