class Kafka::Cluster

A cluster represents the state of a Kafka cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the cluster can connect to will be asked for the cluster metadata, which allows the cluster to map topic partitions to the current leader for those partitions.

Public Class Methods

new(seed_brokers:, broker_pool:, logger:) click to toggle source

Initializes a Cluster with a set of seed brokers.

The cluster will try to fetch cluster metadata from one of the brokers.

@param seed_brokers [Array<URI>] @param broker_pool [Kafka::BrokerPool] @param logger [Logger]

# File lib/kafka/cluster.rb, line 21
def initialize(seed_brokers:, broker_pool:, logger:)
  if seed_brokers.empty?
    raise ArgumentError, "At least one seed broker must be configured"
  end

  @logger = TaggedLogger.new(logger)
  @seed_brokers = seed_brokers
  @broker_pool = broker_pool
  @cluster_info = nil
  @stale = true

  # This is the set of topics we need metadata for. If empty, metadata for
  # all topics will be fetched.
  @target_topics = Set.new
end

Public Instance Methods

add_target_topics(topics) click to toggle source

Adds a list of topics to the target list. Only the topics on this list will be queried for metadata.

@param topics [Array<String>] @return [nil]

# File lib/kafka/cluster.rb, line 42
def add_target_topics(topics)
  topics = Set.new(topics)
  unless topics.subset?(@target_topics)
    new_topics = topics - @target_topics

    unless new_topics.empty?
      @logger.info "New topics added to target list: #{new_topics.to_a.join(', ')}"

      @target_topics.merge(new_topics)

      refresh_metadata!
    end
  end
end
alter_topic(name, configs = {}) click to toggle source
# File lib/kafka/cluster.rb, line 230
def alter_topic(name, configs = {})
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
  }

  broker = controller_broker

  @logger.info "Altering the config for topic `#{name}` using controller broker #{broker}"

  response = broker.alter_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end

  nil
end
api_info(api_key) click to toggle source
# File lib/kafka/cluster.rb, line 57
def api_info(api_key)
  apis.find {|api| api.api_key == api_key }
end
apis() click to toggle source
# File lib/kafka/cluster.rb, line 72
def apis
  @apis ||=
    begin
      response = random_broker.api_versions

      Protocol.handle_error(response.error_code)

      response.apis
    end
end
clear_target_topics() click to toggle source

Clears the list of target topics.

@see add_target_topics @return [nil]

# File lib/kafka/cluster.rb, line 87
def clear_target_topics
  @target_topics.clear
  refresh_metadata!
end
cluster_info() click to toggle source
# File lib/kafka/cluster.rb, line 349
def cluster_info
  @cluster_info ||= fetch_cluster_info
end
create_partitions_for(name, num_partitions:, timeout:) click to toggle source
# File lib/kafka/cluster.rb, line 255
def create_partitions_for(name, num_partitions:, timeout:)
  options = {
    topics: [[name, num_partitions, nil]],
    timeout: timeout
  }

  broker = controller_broker

  @logger.info "Creating #{num_partitions} partition(s) for topic `#{name}` using controller broker #{broker}"

  response = broker.create_partitions(**options)

  response.errors.each do |topic, error_code, error_message|
    Protocol.handle_error(error_code, error_message)
  end
  mark_as_stale!

  @logger.info "Topic `#{name}` was updated"
end
create_topic(name, num_partitions:, replication_factor:, timeout:, config:) click to toggle source
# File lib/kafka/cluster.rb, line 151
def create_topic(name, num_partitions:, replication_factor:, timeout:, config:)
  options = {
    topics: {
      name => {
        num_partitions: num_partitions,
        replication_factor: replication_factor,
        config: config,
      }
    },
    timeout: timeout,
  }

  broker = controller_broker

  @logger.info "Creating topic `#{name}` using controller broker #{broker}"

  response = broker.create_topics(**options)

  response.errors.each do |topic, error_code|
    Protocol.handle_error(error_code)
  end

  begin
    partitions_for(name).each do |info|
      Protocol.handle_error(info.partition_error_code)
    end
  rescue Kafka::LeaderNotAvailable
    @logger.warn "Leader not yet available for `#{name}`, waiting 1s..."
    sleep 1

    retry
  rescue Kafka::UnknownTopicOrPartition
    @logger.warn "Topic `#{name}` not yet created, waiting 1s..."
    sleep 1

    retry
  end

  @logger.info "Topic `#{name}` was created"
end
delete_topic(name, timeout:) click to toggle source
# File lib/kafka/cluster.rb, line 192
def delete_topic(name, timeout:)
  options = {
    topics: [name],
    timeout: timeout,
  }

  broker = controller_broker

  @logger.info "Deleting topic `#{name}` using controller broker #{broker}"

  response = broker.delete_topics(**options)

  response.errors.each do |topic, error_code|
    Protocol.handle_error(error_code)
  end

  @logger.info "Topic `#{name}` was deleted"
end
describe_group(group_id) click to toggle source
# File lib/kafka/cluster.rb, line 248
def describe_group(group_id)
  response = get_group_coordinator(group_id: group_id).describe_groups(group_ids: [group_id])
  group = response.groups.first
  Protocol.handle_error(group.error_code)
  group
end
describe_topic(name, configs = []) click to toggle source
# File lib/kafka/cluster.rb, line 211
def describe_topic(name, configs = [])
  options = {
    resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
  }
  broker = controller_broker

  @logger.info "Fetching topic `#{name}`'s configs using controller broker #{broker}"

  response = broker.describe_configs(**options)

  response.resources.each do |resource|
    Protocol.handle_error(resource.error_code, resource.error_message)
  end
  topic_description = response.resources.first
  topic_description.configs.each_with_object({}) do |config, hash|
    hash[config.name] = config.value
  end
end
disconnect() click to toggle source
# File lib/kafka/cluster.rb, line 345
def disconnect
  @broker_pool.close
end
get_group_coordinator(group_id:) click to toggle source

Finds the broker acting as the coordinator of the given group.

@param group_id: [String] @return [Broker] the broker that's currently coordinator.

# File lib/kafka/cluster.rb, line 118
def get_group_coordinator(group_id:)
  @logger.debug "Getting group coordinator for `#{group_id}`"
  refresh_metadata_if_necessary!
  get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_GROUP, group_id)
end
get_leader(topic, partition) click to toggle source

Finds the broker acting as the leader of the given topic and partition.

@param topic [String] @param partition [Integer] @return [Broker] the broker that's currently leader.

# File lib/kafka/cluster.rb, line 110
def get_leader(topic, partition)
  connect_to_broker(get_leader_id(topic, partition))
end
get_transaction_coordinator(transactional_id:) click to toggle source

Finds the broker acting as the coordinator of the given transaction.

@param transactional_id: [String] @return [Broker] the broker that's currently coordinator.

# File lib/kafka/cluster.rb, line 128
def get_transaction_coordinator(transactional_id:)
  @logger.debug "Getting transaction coordinator for `#{transactional_id}`"

  refresh_metadata_if_necessary!

  if transactional_id.nil?
    # Get a random_broker
    @logger.debug "Transaction ID is not available. Choose a random broker."
    return random_broker
  else
    get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_TRANSACTION, transactional_id)
  end
end
list_groups() click to toggle source
# File lib/kafka/cluster.rb, line 336
def list_groups
  refresh_metadata_if_necessary!
  cluster_info.brokers.map do |broker|
    response = connect_to_broker(broker.node_id).list_groups
    Protocol.handle_error(response.error_code)
    response.groups.map(&:group_id)
  end.flatten.uniq
end
list_topics() click to toggle source

Lists all topics in the cluster.

# File lib/kafka/cluster.rb, line 329
def list_topics
  response = random_broker.fetch_metadata(topics: nil)
  response.topics.select do |topic|
    topic.topic_error_code == 0
  end.map(&:topic_name)
end
mark_as_stale!() click to toggle source
# File lib/kafka/cluster.rb, line 92
def mark_as_stale!
  @stale = true
end
partitions_for(topic) click to toggle source
# File lib/kafka/cluster.rb, line 142
def partitions_for(topic)
  add_target_topics([topic])
  refresh_metadata_if_necessary!
  cluster_info.partitions_for(topic)
rescue Kafka::ProtocolError
  mark_as_stale!
  raise
end
refresh_metadata!() click to toggle source
# File lib/kafka/cluster.rb, line 96
def refresh_metadata!
  @cluster_info = nil
  cluster_info
end
refresh_metadata_if_necessary!() click to toggle source
# File lib/kafka/cluster.rb, line 101
def refresh_metadata_if_necessary!
  refresh_metadata! if @stale
end
resolve_offset(topic, partition, offset) click to toggle source
# File lib/kafka/cluster.rb, line 317
def resolve_offset(topic, partition, offset)
  resolve_offsets(topic, [partition], offset).fetch(partition)
end
resolve_offsets(topic, partitions, offset) click to toggle source
# File lib/kafka/cluster.rb, line 275
def resolve_offsets(topic, partitions, offset)
  add_target_topics([topic])
  refresh_metadata_if_necessary!

  partitions_by_broker = partitions.each_with_object({}) {|partition, hsh|
    broker = get_leader(topic, partition)

    hsh[broker] ||= []
    hsh[broker] << partition
  }

  if offset == :earliest
    offset = -2
  elsif offset == :latest
    offset = -1
  end

  offsets = {}

  partitions_by_broker.each do |broker, broker_partitions|
    response = broker.list_offsets(
      topics: {
        topic => broker_partitions.map {|partition|
          {
            partition: partition,
            time: offset
          }
        }
      }
    )

    broker_partitions.each do |partition|
      offsets[partition] = response.offset_for(topic, partition)
    end
  end

  offsets
rescue Kafka::ProtocolError
  mark_as_stale!
  raise
end
supports_api?(api_key, version = nil) click to toggle source
# File lib/kafka/cluster.rb, line 61
def supports_api?(api_key, version = nil)
  info = api_info(api_key)
  if info.nil?
    return false
  elsif version.nil?
    return true
  else
    return info.version_supported?(version)
  end
end
topics() click to toggle source
# File lib/kafka/cluster.rb, line 321
def topics
  refresh_metadata_if_necessary!
  cluster_info.topics.select do |topic|
    topic.topic_error_code == 0
  end.map(&:topic_name)
end

Private Instance Methods

connect_to_broker(broker_id) click to toggle source
# File lib/kafka/cluster.rb, line 405
def connect_to_broker(broker_id)
  info = cluster_info.find_broker(broker_id)

  @broker_pool.connect(info.host, info.port, node_id: info.node_id)
end
controller_broker() click to toggle source
# File lib/kafka/cluster.rb, line 411
def controller_broker
  connect_to_broker(cluster_info.controller_id)
end
fetch_cluster_info() click to toggle source

Fetches the cluster metadata.

This is used to update the partition leadership information, among other things. The methods will go through each node listed in `seed_brokers`, connecting to the first one that is available. This node will be queried for the cluster metadata.

@raise [ConnectionError] if none of the nodes in `seed_brokers` are available. @return [Protocol::MetadataResponse] the cluster metadata.

# File lib/kafka/cluster.rb, line 367
def fetch_cluster_info
  errors = []

  @seed_brokers.shuffle.each do |node|
    @logger.info "Fetching cluster metadata from #{node}"

    begin
      broker = @broker_pool.connect(node.hostname, node.port)
      cluster_info = broker.fetch_metadata(topics: @target_topics)

      if cluster_info.brokers.empty?
        @logger.error "No brokers in cluster"
      else
        @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

        @stale = false

        return cluster_info
      end
    rescue Error => e
      @logger.error "Failed to fetch metadata from #{node}: #{e}"
      errors << [node, e]
    ensure
      broker.disconnect unless broker.nil?
    end
  end

  error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")

  raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
get_coordinator(coordinator_type, coordinator_key) click to toggle source
# File lib/kafka/cluster.rb, line 415
def get_coordinator(coordinator_type, coordinator_key)
  cluster_info.brokers.each do |broker_info|
    begin
      broker = connect_to_broker(broker_info.node_id)
      response = broker.find_coordinator(
        coordinator_type: coordinator_type,
        coordinator_key: coordinator_key
      )

      Protocol.handle_error(response.error_code, response.error_message)

      coordinator_id = response.coordinator_id

      @logger.debug "Coordinator for `#{coordinator_key}` is #{coordinator_id}. Connecting..."

      # It's possible that a new broker is introduced to the cluster and
      # becomes the coordinator before we have a chance to refresh_metadata.
      coordinator = begin
        connect_to_broker(coordinator_id)
      rescue Kafka::NoSuchBroker
        @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing"
        refresh_metadata!
        connect_to_broker(coordinator_id)
      end

      @logger.debug "Connected to coordinator: #{coordinator} for `#{coordinator_key}`"

      return coordinator
    rescue CoordinatorNotAvailable
      @logger.debug "Coordinator not available; retrying in 1s"
      sleep 1
      retry
    rescue ConnectionError => e
      @logger.error "Failed to get coordinator info from #{broker}: #{e}"
    end
  end

  raise Kafka::Error, "Failed to find coordinator"
end
get_leader_id(topic, partition) click to toggle source
# File lib/kafka/cluster.rb, line 355
def get_leader_id(topic, partition)
  cluster_info.find_leader_id(topic, partition)
end
random_broker() click to toggle source
# File lib/kafka/cluster.rb, line 399
def random_broker
  refresh_metadata_if_necessary!
  node_id = cluster_info.brokers.sample.node_id
  connect_to_broker(node_id)
end