class Kazoo::Broker

Kazoo::Broker represents a Kafka broker in a Kafka cluster.

Attributes

cluster[R]
host[R]
id[R]
jmx_port[R]
port[R]

Public Class Methods

from_json(cluster, id, json) click to toggle source

Instantiates a Kazoo::Broker instance based on the Broker metadata that is stored in Zookeeper under `/brokers/<id>`. TODO: add support for endpoints in Kafka 0.9+

# File lib/kazoo/broker.rb, line 83
def self.from_json(cluster, id, json)
  case json.fetch('version')
  when 1, 2, 3, 4
    new(cluster, id.to_i, json.fetch('host'), json.fetch('port'), jmx_port: json.fetch('jmx_port', nil))
  else
    raise Kazoo::VersionNotSupported
  end
end
new(cluster, id, host, port, jmx_port: nil) click to toggle source
# File lib/kazoo/broker.rb, line 7
def initialize(cluster, id, host, port, jmx_port: nil)
  @cluster = cluster
  @id, @host, @port = id, host, port
  @jmx_port = jmx_port
end

Public Instance Methods

==(other)
Alias for: eql?
addr() click to toggle source

Returns the address of this broker, i.e. the hostname plus the port to connect to.

# File lib/kazoo/broker.rb, line 62
def addr
  "#{host}:#{port}"
end
critical?(replicas: 1) click to toggle source

Returns whether this broker is currently considered critical.

A broker is considered critical if it is the only in sync replica of any of the partitions it hosts. This means that if this broker were to go down, the partition woild become unavailable for writes, and may also lose data depending on the configuration and settings.

# File lib/kazoo/broker.rb, line 47
def critical?(replicas: 1)
  result, mutex = false, Mutex.new
  threads = replicated_partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      isr = partition.isr.reject { |r| r == self }
      mutex.synchronize { result = true if isr.length < Integer(replicas) }
    end
  end
  threads.each(&:join)
  result
end
eql?(other) click to toggle source
# File lib/kazoo/broker.rb, line 66
def eql?(other)
  other.is_a?(Kazoo::Broker) && other.cluster == self.cluster && other.id == self.id
end
Also aliased as: ==
hash() click to toggle source
# File lib/kazoo/broker.rb, line 72
def hash
  [self.cluster, self.id].hash
end
inspect() click to toggle source
# File lib/kazoo/broker.rb, line 76
def inspect
  "#<Kazoo::Broker id=#{id} addr=#{addr}>"
end
led_partitions() click to toggle source

Returns a list of all partitions that are currently led by this broker.

# File lib/kazoo/broker.rb, line 14
def led_partitions
  result, mutex = [], Mutex.new
  threads = cluster.partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      select = partition.leader == self
      mutex.synchronize { result << partition } if select
    end
  end
  threads.each(&:join)
  result
end
replicated_partitions() click to toggle source

Returns a list of all partitions that host a replica on this broker.

# File lib/kazoo/broker.rb, line 28
def replicated_partitions
  result, mutex = [], Mutex.new
  threads = cluster.partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      select = partition.replicas.include?(self)
      mutex.synchronize { result << partition } if select
    end
  end
  threads.each(&:join)
  result
end