class Kafkat::Interface::Zookeeper

Attributes

zk_path[R]

Public Class Methods

new(config) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 12
def initialize(config)
  @zk_path = config.zk_path
end

Public Instance Methods

get_broker(id) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 38
def get_broker(id)
  path = broker_path(id)
  string = zk.get(path).first
  json = JSON.parse(string)
  host, port = json['host'], json['port']
  Broker.new(id, host, port)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end
get_broker_ids() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 16
def get_broker_ids
  zk.children(brokers_path)
end
get_brokers(ids=nil) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 20
def get_brokers(ids=nil)
  brokers = {}
  ids ||= zk.children(brokers_path)

  threads = ids.map do |id|
    id = id.to_i
    Thread.new do
      begin
        brokers[id] = get_broker(id)
      rescue
      end
    end
  end
  threads.map(&:join)

  brokers
end
get_controller() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 117
def get_controller
  string = zk.get(controller_path).first
  controller_json = JSON.parse(string)
  controller_id = controller_json['brokerid']
  get_broker(controller_id)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end
get_topic(name) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 80
def get_topic(name)
  partition_queue = Queue.new
  path1 = topic_path(name)
  path2 = topic_partitions_path(name)

  partitions = []
  topic_string = pool.with_connection { |cnx| cnx.get(path1).first }
  partition_ids = pool.with_connection { |cnx| cnx.children(path2) }

  topic_json = JSON.parse(topic_string)

  threads = partition_ids.map do |id|
    id = id.to_i

    Thread.new do
      path3 = topic_partition_state_path(name, id)
      partition_string = pool.with_connection { |cnx| cnx.get(path3).first }
      partition_json = JSON.parse(partition_string)
      replicas = topic_json['partitions'][id.to_s]
      leader = partition_json['leader']
      isr = partition_json['isr']

      partition_queue << Partition.new(name, id, replicas, leader, isr)
    end
  end
  threads.map(&:join)

  until partition_queue.empty? do
    partitions << partition_queue.pop
  end

  partitions.sort_by!(&:id)
  Topic.new(name, partitions)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end
get_topic_names() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 48
def get_topic_names()
  return zk.children(topics_path)
end
get_topics(names=nil) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 52
def get_topics(names=nil)
  error_msgs = {}
  topics = {}

  if names == nil
    pool.with_connection do |cnx|
      names = cnx.children(topics_path)
    end
  end

  threads = names.map do |name|
    Thread.new do
      begin
        topics[name] = get_topic(name)
      rescue => e
        error_msgs[name] = e
      end
    end
  end
  threads.map(&:join)

  unless error_msgs.empty?
    STDERR.print "ERROR: zk cmds failed on get_topics: \n#{error_msgs.values.join("\n")}\n"
    exit 1
  end
  topics
end
write_leader(partition, broker_id) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 126
def write_leader(partition, broker_id)
  path = topic_partition_state_path(partition.topic_name, partition.id)
  string, stat = zk.get(path)

  partition_json = JSON.parse(string)
  partition_json['leader'] = broker_id
  new_string = JSON.dump(partition_json)

  unless zk.set(path, new_string, version: stat.version)
    raise ChangedDuringUpdateError
  end
end

Private Instance Methods

broker_path(id) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 153
def broker_path(id)
  "/brokers/ids/#{id}"
end
brokers_path() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 149
def brokers_path
  '/brokers/ids'
end
controller_path() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 173
def controller_path
  "/controller"
end
pool() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 141
def pool
  @pool ||= ZK.new_pool(zk_path, :min_clients => 10, :max_clients => 300, :timeout => 1)
end
topic_partition_state_path(name, id) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 169
def topic_partition_state_path(name, id)
  "/brokers/topics/#{name}/partitions/#{id}/state"
end
topic_partitions_path(name) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 165
def topic_partitions_path(name)
  "/brokers/topics/#{name}/partitions"
end
topic_path(name) click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 161
def topic_path(name)
  "/brokers/topics/#{name}"
end
topics_path() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 157
def topics_path
  '/brokers/topics'
end
zk() click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 145
def zk
  @zk ||= ZK.new(zk_path)
end