class Poseidon::ClusterMetadata

Encapsulates what we known about brokers, topics and partitions from Metadata API calls.

@api private

Attributes

brokers[R]
last_refreshed_at[R]
topic_metadata[R]

Public Class Methods

new() click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 8
def initialize
  @brokers        = {}
  @topic_metadata = {}
  @last_refreshed_at = nil
end

Public Instance Methods

broker(broker_id) click to toggle source

Provides a Broker object for broker_id. This corresponds to the broker ids in the TopicMetadata objects.

@param [Integer] broker_id Broker id

# File lib/poseidon/cluster_metadata.rb, line 46
def broker(broker_id)
  @brokers[broker_id]
end
have_metadata_for_topics?(topic_names) click to toggle source

Do we have metadata for these topics already?

@param [Enumberable<String>] topic_names A set of topics. @return [Boolean] true if we have metadata for all topic_names, otherwise false.

# File lib/poseidon/cluster_metadata.rb, line 30
def have_metadata_for_topics?(topic_names)
  topic_names.all? { |topic| @topic_metadata[topic] }
end
lead_broker_for_partition(topic_name, partition) click to toggle source

Return lead broker for topic and partition

# File lib/poseidon/cluster_metadata.rb, line 51
def lead_broker_for_partition(topic_name, partition)
  unless @topic_metadata[topic_name]
    raise ::Poseidon::Errors::UnknownTopicOrPartition,
      "Topic not found: #{topic_name}"
  end

  unless (broker_id = @topic_metadata[topic_name].partition_leader(partition))
    raise ::Poseidon::Errors::UnknownTopicOrPartition,
      "Partition not found #{topic_name}/#{partition}"
  end

  unless (broker = @brokers[broker_id])
    raise ::Poseidon::Errors::LeaderNotAvailable,
      "Lead broker #{broker_id} not present in brokers: #{@brokers.keys.inspect}"
  end

  broker
end
metadata_for_topics(topic_names) click to toggle source

Provides metadata for each topic

@param [Enumerable<String>] topic_names Topics we should return metadata for @return [Hash<String,TopicMetadata>]

# File lib/poseidon/cluster_metadata.rb, line 38
def metadata_for_topics(topic_names)
  Hash[topic_names.map { |name| [name, @topic_metadata[name]] }]
end
reset() click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 84
def reset
  @brokers        = {}
  @topic_metadata = {}
end
to_s() click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 74
def to_s
  out = ""
  @topic_metadata.each do |topic, metadata|
    out << "Topic: #{topic}"
    out << "-------------------------"
    out << metadata.to_s
  end
  out
end
topics() click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 70
def topics
  @topic_metadata.keys
end
update(topic_metadata_response) click to toggle source

Update what we know about the cluter based on MetadataResponse

@param [MetadataResponse] topic_metadata_response @return nil

# File lib/poseidon/cluster_metadata.rb, line 18
def update(topic_metadata_response)
  update_brokers(topic_metadata_response.brokers)
  update_topics(topic_metadata_response.topics)

  @last_refreshed_at = Time.now
  nil
end

Private Instance Methods

update_brokers(brokers) click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 98
def update_brokers(brokers)
  brokers.each do |broker|
    @brokers[broker.id] = broker
  end
end
update_topics(topics) click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 90
def update_topics(topics)
  topics.each do |topic|
    if topic.exists?
      @topic_metadata[topic.name] = topic
    end
  end
end