class Poseidon::BrokerPool

BrokerPool allows you to send api calls to the a brokers Connection.

@api private

Public Class Methods

new(client_id, seed_brokers, socket_timeout_ms) click to toggle source

@param [String] client_id

# File lib/poseidon/broker_pool.rb, line 18
def initialize(client_id, seed_brokers, socket_timeout_ms)
  @connections = {}
  @brokers = {}
  @client_id = client_id
  @seed_brokers = seed_brokers
  @socket_timeout_ms = socket_timeout_ms
end
open(client_id, seed_brokers, socket_timeout_ms) { |broker_pool| ... } click to toggle source

@yieldparam [BrokerPool]

# File lib/poseidon/broker_pool.rb, line 9
def self.open(client_id, seed_brokers, socket_timeout_ms, &block)
  broker_pool = new(client_id, seed_brokers, socket_timeout_ms)

  yield broker_pool
ensure
  broker_pool.close
end

Public Instance Methods

close() click to toggle source

Closes all open connections to brokers

# File lib/poseidon/broker_pool.rb, line 57
def close
  @brokers.values(&:close)
  @brokers = {}
end
Also aliased as: shutdown
execute_api_call(broker_id, api_call, *args) click to toggle source

Executes an api call on the connection

@param [Integer] broker_id id of the broker we want to execute it on @param [Symbol] api_call

the api call we want to execute (:produce,:fetch,etc)
# File lib/poseidon/broker_pool.rb, line 52
def execute_api_call(broker_id, api_call, *args)
  connection(broker_id).send(api_call, *args)
end
fetch_metadata(topics) click to toggle source
# File lib/poseidon/broker_pool.rb, line 26
def fetch_metadata(topics)
  @seed_brokers.each do |broker|
    if metadata = fetch_metadata_from_broker(broker, topics)
      Poseidon.logger.debug { "Fetched metadata from #{broker}:\n" + metadata.to_s }
      return metadata
    end
  end
  raise Errors::UnableToFetchMetadata
end
shutdown()
Alias for: close
update_known_brokers(brokers) click to toggle source

Update the brokers we know about

TODO break connection when a brokers info changes?

@param [Hash<Integer,Hash>] brokers

Hash of broker_id => { :host => host, :port => port }
# File lib/poseidon/broker_pool.rb, line 42
def update_known_brokers(brokers)
  @brokers.update(brokers)
  nil
end

Private Instance Methods

connection(broker_id) click to toggle source
# File lib/poseidon/broker_pool.rb, line 74
def connection(broker_id)
  @connections[broker_id] ||= new_connection(broker_id)
end
fetch_metadata_from_broker(broker, topics) click to toggle source
# File lib/poseidon/broker_pool.rb, line 65
def fetch_metadata_from_broker(broker, topics)
  host, port = broker.split(":")
  Connection.open(host, port, @client_id, @socket_timeout_ms) do |connection|
    connection.topic_metadata(topics)
  end
rescue Connection::ConnectionFailedError
  return nil
end
new_connection(broker_id) click to toggle source
# File lib/poseidon/broker_pool.rb, line 78
def new_connection(broker_id)
  info = @brokers[broker_id]
  if info.nil?
    raise UnknownBroker
  end
  Connection.new(info[:host], info[:port], @client_id, @socket_timeout_ms)
end