class Kafka::Cluster

A cluster represents the state of a Kafka cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the cluster can connect to will be asked for the cluster metadata, which allows the cluster to map topic partitions to the current leader for those partitions.

Public Class Methods

new(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false) click to toggle source

Initializes a Cluster with a set of seed brokers.

The cluster will try to fetch cluster metadata from one of the brokers.

@param seed_brokers [Array<URI>] @param broker_pool [Kafka::BrokerPool] @param logger [Logger] @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize}

# File lib/kafka/cluster.rb, line 23
def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false)
  if seed_brokers.empty?
    raise ArgumentError, "At least one seed broker must be configured"
  end

  @logger = TaggedLogger.new(logger)
  @seed_brokers = seed_brokers
  @broker_pool = broker_pool
  @resolve_seed_brokers = resolve_seed_brokers
  @cluster_info = nil
  @stale = true

  # This is the set of topics we need metadata for. If empty, metadata for
  # all topics will be fetched.
  @target_topics = Set.new
end

Public Instance Methods

add_target_topics(topics) click to toggle source

Adds a list of topics to the target list. Only the topics on this list will be queried for metadata.

@param topics [Array<String>] @return [nil]

# File lib/kafka/cluster.rb, line 45
def add_target_topics(topics)
  topics = Set.new(topics)
  unless topics.subset?(@target_topics)
    new_topics = topics - @target_topics

    unless new_topics.empty?
      if new_topics.any? { |topic| topic.nil? or topic.empty? }
        raise ArgumentError, "Topic must not be nil or empty"
      end

      @logger.info "New topics added to target list: #{new_topics.to_a.join(', ')}"

      @target_topics.merge(new_topics)

      refresh_metadata!
    end
  end
end
alter_configs(broker_id, configs = []) click to toggle source
# File lib/kafka/cluster.rb, line 166
def alter_configs(broker_id, configs = [])
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_CLUSTER, broker_id.to_s, configs]]
  }

  info = cluster_info.brokers.find {|broker| broker.node_id == broker_id }
  broker = @broker_pool.connect(info.host, info.port, node_id: info.node_id)

  response = broker.alter_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end

  nil
end
alter_topic(name, configs = {}) click to toggle source
# File lib/kafka/cluster.rb, line 271
def alter_topic(name, configs = {})
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
  }

  broker = controller_broker

  @logger.info "Altering the config for topic `#{name}` using controller broker #{broker}"

  response = broker.alter_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end

  nil
end
api_info(api_key) click to toggle source
# File lib/kafka/cluster.rb, line 64
def api_info(api_key)
  apis.find {|api| api.api_key == api_key }
end
apis() click to toggle source
# File lib/kafka/cluster.rb, line 79
def apis
  @apis ||=
    begin
      response = random_broker.api_versions

      Protocol.handle_error(response.error_code)

      response.apis
    end
end
clear_target_topics() click to toggle source

Clears the list of target topics.

@see add_target_topics @return [nil]

# File lib/kafka/cluster.rb, line 94
def clear_target_topics
  @target_topics.clear
  refresh_metadata!
end
cluster_info() click to toggle source
# File lib/kafka/cluster.rb, line 404
def cluster_info
  @cluster_info ||= fetch_cluster_info
end
create_partitions_for(name, num_partitions:, timeout:) click to toggle source
# File lib/kafka/cluster.rb, line 310
def create_partitions_for(name, num_partitions:, timeout:)
  options = {
    topics: [[name, num_partitions, nil]],
    timeout: timeout
  }

  broker = controller_broker

  @logger.info "Creating #{num_partitions} partition(s) for topic `#{name}` using controller broker #{broker}"

  response = broker.create_partitions(**options)

  response.errors.each do |topic, error_code, error_message|
    Protocol.handle_error(error_code, error_message)
  end
  mark_as_stale!

  @logger.info "Topic `#{name}` was updated"
end
create_topic(name, num_partitions:, replication_factor:, timeout:, config:) click to toggle source
# File lib/kafka/cluster.rb, line 192
def create_topic(name, num_partitions:, replication_factor:, timeout:, config:)
  options = {
    topics: {
      name => {
        num_partitions: num_partitions,
        replication_factor: replication_factor,
        config: config,
      }
    },
    timeout: timeout,
  }

  broker = controller_broker

  @logger.info "Creating topic `#{name}` using controller broker #{broker}"

  response = broker.create_topics(**options)

  response.errors.each do |topic, error_code|
    Protocol.handle_error(error_code)
  end

  begin
    partitions_for(name).each do |info|
      Protocol.handle_error(info.partition_error_code)
    end
  rescue Kafka::LeaderNotAvailable
    @logger.warn "Leader not yet available for `#{name}`, waiting 1s..."
    sleep 1

    retry
  rescue Kafka::UnknownTopicOrPartition
    @logger.warn "Topic `#{name}` not yet created, waiting 1s..."
    sleep 1

    retry
  end

  @logger.info "Topic `#{name}` was created"
end
delete_topic(name, timeout:) click to toggle source
# File lib/kafka/cluster.rb, line 233
def delete_topic(name, timeout:)
  options = {
    topics: [name],
    timeout: timeout,
  }

  broker = controller_broker

  @logger.info "Deleting topic `#{name}` using controller broker #{broker}"

  response = broker.delete_topics(**options)

  response.errors.each do |topic, error_code|
    Protocol.handle_error(error_code)
  end

  @logger.info "Topic `#{name}` was deleted"
end
describe_configs(broker_id, configs = []) click to toggle source
# File lib/kafka/cluster.rb, line 149
def describe_configs(broker_id, configs = [])
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_CLUSTER, broker_id.to_s, configs]]
  }

  info = cluster_info.brokers.find {|broker| broker.node_id == broker_id }
  broker = @broker_pool.connect(info.host, info.port, node_id: info.node_id)

  response = broker.describe_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end

  response.resources.first.configs
end
describe_group(group_id) click to toggle source
# File lib/kafka/cluster.rb, line 289
def describe_group(group_id)
  response = get_group_coordinator(group_id: group_id).describe_groups(group_ids: [group_id])
  group = response.groups.first
  Protocol.handle_error(group.error_code)
  group
end
describe_topic(name, configs = []) click to toggle source
# File lib/kafka/cluster.rb, line 252
def describe_topic(name, configs = [])
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
  }
  broker = controller_broker

  @logger.info "Fetching topic `#{name}`'s configs using controller broker #{broker}"

  response = broker.describe_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end
  topic_description = response.resources.first
  topic_description.configs.each_with_object({}) do |config, hash|
    hash[config.name] = config.value
  end
end
disconnect() click to toggle source
# File lib/kafka/cluster.rb, line 400
def disconnect
  @broker_pool.close
end
fetch_group_offsets(group_id) click to toggle source
# File lib/kafka/cluster.rb, line 296
def fetch_group_offsets(group_id)
  topics = get_group_coordinator(group_id: group_id)
    .fetch_offsets(group_id: group_id, topics: nil)
    .topics

  topics.each do |_, partitions|
    partitions.each do |_, response|
      Protocol.handle_error(response.error_code)
    end
  end

  topics
end
get_group_coordinator(group_id:) click to toggle source

Finds the broker acting as the coordinator of the given group.

@param group_id [String] @return [Broker] the broker that's currently coordinator.

# File lib/kafka/cluster.rb, line 125
def get_group_coordinator(group_id:)
  @logger.debug "Getting group coordinator for `#{group_id}`"
  refresh_metadata_if_necessary!
  get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_GROUP, group_id)
end
get_leader(topic, partition) click to toggle source

Finds the broker acting as the leader of the given topic and partition.

@param topic [String] @param partition [Integer] @return [Broker] the broker that's currently leader.

# File lib/kafka/cluster.rb, line 117
def get_leader(topic, partition)
  connect_to_broker(get_leader_id(topic, partition))
end
get_transaction_coordinator(transactional_id:) click to toggle source

Finds the broker acting as the coordinator of the given transaction.

@param transactional_id [String] @return [Broker] the broker that's currently coordinator.

# File lib/kafka/cluster.rb, line 135
def get_transaction_coordinator(transactional_id:)
  @logger.debug "Getting transaction coordinator for `#{transactional_id}`"

  refresh_metadata_if_necessary!

  if transactional_id.nil?
    # Get a random_broker
    @logger.debug "Transaction ID is not available. Choose a random broker."
    return random_broker
  else
    get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_TRANSACTION, transactional_id)
  end
end
list_groups() click to toggle source
# File lib/kafka/cluster.rb, line 391
def list_groups
  refresh_metadata_if_necessary!
  cluster_info.brokers.map do |broker|
    response = connect_to_broker(broker.node_id).list_groups
    Protocol.handle_error(response.error_code)
    response.groups.map(&:group_id)
  end.flatten.uniq
end
list_topics() click to toggle source

Lists all topics in the cluster.

# File lib/kafka/cluster.rb, line 384
def list_topics
  response = random_broker.fetch_metadata(topics: nil)
  response.topics.select do |topic|
    topic.topic_error_code == 0
  end.map(&:topic_name)
end
mark_as_stale!() click to toggle source
# File lib/kafka/cluster.rb, line 99
def mark_as_stale!
  @stale = true
end
partitions_for(topic) click to toggle source
# File lib/kafka/cluster.rb, line 183
def partitions_for(topic)
  add_target_topics([topic])
  refresh_metadata_if_necessary!
  cluster_info.partitions_for(topic)
rescue Kafka::ProtocolError
  mark_as_stale!
  raise
end
refresh_metadata!() click to toggle source
# File lib/kafka/cluster.rb, line 103
def refresh_metadata!
  @cluster_info = nil
  cluster_info
end
refresh_metadata_if_necessary!() click to toggle source
# File lib/kafka/cluster.rb, line 108
def refresh_metadata_if_necessary!
  refresh_metadata! if @stale
end
resolve_offset(topic, partition, offset) click to toggle source
# File lib/kafka/cluster.rb, line 372
def resolve_offset(topic, partition, offset)
  resolve_offsets(topic, [partition], offset).fetch(partition)
end
resolve_offsets(topic, partitions, offset) click to toggle source
# File lib/kafka/cluster.rb, line 330
def resolve_offsets(topic, partitions, offset)
  add_target_topics([topic])
  refresh_metadata_if_necessary!

  partitions_by_broker = partitions.each_with_object({}) {|partition, hsh|
    broker = get_leader(topic, partition)

    hsh[broker] ||= []
    hsh[broker] << partition
  }

  if offset == :earliest
    offset = -2
  elsif offset == :latest
    offset = -1
  end

  offsets = {}

  partitions_by_broker.each do |broker, broker_partitions|
    response = broker.list_offsets(
      topics: {
        topic => broker_partitions.map {|partition|
          {
            partition: partition,
            time: offset
          }
        }
      }
    )

    broker_partitions.each do |partition|
      offsets[partition] = response.offset_for(topic, partition)
    end
  end

  offsets
rescue Kafka::ProtocolError
  mark_as_stale!
  raise
end
supports_api?(api_key, version = nil) click to toggle source
# File lib/kafka/cluster.rb, line 68
def supports_api?(api_key, version = nil)
  info = api_info(api_key)
  if info.nil?
    return false
  elsif version.nil?
    return true
  else
    return info.version_supported?(version)
  end
end
topics() click to toggle source
# File lib/kafka/cluster.rb, line 376
def topics
  refresh_metadata_if_necessary!
  cluster_info.topics.select do |topic|
    topic.topic_error_code == 0
  end.map(&:topic_name)
end

Private Instance Methods

connect_to_broker(broker_id) click to toggle source
# File lib/kafka/cluster.rb, line 463
def connect_to_broker(broker_id)
  info = cluster_info.find_broker(broker_id)

  @broker_pool.connect(info.host, info.port, node_id: info.node_id)
end
controller_broker() click to toggle source
# File lib/kafka/cluster.rb, line 469
def controller_broker
  connect_to_broker(cluster_info.controller_id)
end
fetch_cluster_info() click to toggle source

Fetches the cluster metadata.

This is used to update the partition leadership information, among other things. The methods will go through each node listed in `seed_brokers`, connecting to the first one that is available. This node will be queried for the cluster metadata.

@raise [ConnectionError] if none of the nodes in `seed_brokers` are available. @return [Protocol::MetadataResponse] the cluster metadata.

# File lib/kafka/cluster.rb, line 422
def fetch_cluster_info
  errors = []
  @seed_brokers.shuffle.each do |node|
    (@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip|
      node_info = node.to_s
      node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip
      @logger.info "Fetching cluster metadata from #{node_info}"

      begin
        broker = @broker_pool.connect(hostname_or_ip, node.port)
        cluster_info = broker.fetch_metadata(topics: @target_topics)

        if cluster_info.brokers.empty?
          @logger.error "No brokers in cluster"
        else
          @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

          @stale = false

          return cluster_info
        end
      rescue Error => e
        @logger.error "Failed to fetch metadata from #{node_info}: #{e}"
        errors << [node_info, e]
      ensure
        broker.disconnect unless broker.nil?
      end
    end
  end

  error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n")

  raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
get_coordinator(coordinator_type, coordinator_key) click to toggle source
# File lib/kafka/cluster.rb, line 473
def get_coordinator(coordinator_type, coordinator_key)
  cluster_info.brokers.each do |broker_info|
    begin
      broker = connect_to_broker(broker_info.node_id)
      response = broker.find_coordinator(
        coordinator_type: coordinator_type,
        coordinator_key: coordinator_key
      )

      Protocol.handle_error(response.error_code, response.error_message)

      coordinator_id = response.coordinator_id

      @logger.debug "Coordinator for `#{coordinator_key}` is #{coordinator_id}. Connecting..."

      # It's possible that a new broker is introduced to the cluster and
      # becomes the coordinator before we have a chance to refresh_metadata.
      coordinator = begin
        connect_to_broker(coordinator_id)
      rescue Kafka::NoSuchBroker
        @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing"
        refresh_metadata!
        connect_to_broker(coordinator_id)
      end

      @logger.debug "Connected to coordinator: #{coordinator} for `#{coordinator_key}`"

      return coordinator
    rescue CoordinatorNotAvailable
      @logger.debug "Coordinator not available; retrying in 1s"
      sleep 1
      retry
    rescue ConnectionError => e
      @logger.error "Failed to get coordinator info from #{broker}: #{e}"
    end
  end

  raise Kafka::Error, "Failed to find coordinator"
end
get_leader_id(topic, partition) click to toggle source
# File lib/kafka/cluster.rb, line 410
def get_leader_id(topic, partition)
  cluster_info.find_leader_id(topic, partition)
end
random_broker() click to toggle source
# File lib/kafka/cluster.rb, line 457
def random_broker
  refresh_metadata_if_necessary!
  node_id = cluster_info.brokers.sample.node_id
  connect_to_broker(node_id)
end