class Kafka::BrokerPool

Public Class Methods

new(connection_builder:, logger:) click to toggle source
# File lib/kafka/broker_pool.rb, line 7
def initialize(connection_builder:, logger:)
  @logger = TaggedLogger.new(logger)
  @connection_builder = connection_builder
  @brokers = {}
end

Public Instance Methods

close() click to toggle source
# File lib/kafka/broker_pool.rb, line 34
def close
  @brokers.each do |id, broker|
    @logger.info "Disconnecting broker #{id}"
    broker.disconnect
  end
end
connect(host, port, node_id: nil) click to toggle source
# File lib/kafka/broker_pool.rb, line 13
def connect(host, port, node_id: nil)
  if @brokers.key?(node_id)
    broker = @brokers.fetch(node_id)
    return broker if broker.address_match?(host, port)
    broker.disconnect
    @brokers[node_id] = nil
  end

  broker = Broker.new(
    connection_builder: @connection_builder,
    host: host,
    port: port,
    node_id: node_id,
    logger: @logger,
  )

  @brokers[node_id] = broker unless node_id.nil?

  broker
end