class Poseidon::ClusterMetadata
Encapsulates what we known about brokers, topics and partitions from Metadata API calls.
@api private
Attributes
brokers[R]
last_refreshed_at[R]
topic_metadata[R]
Public Class Methods
new()
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 8 def initialize @brokers = {} @topic_metadata = {} @last_refreshed_at = nil end
Public Instance Methods
broker(broker_id)
click to toggle source
Provides a Broker object for broker_id
. This corresponds to the broker ids in the TopicMetadata
objects.
@param [Integer] broker_id Broker id
# File lib/poseidon/cluster_metadata.rb, line 46 def broker(broker_id) @brokers[broker_id] end
have_metadata_for_topics?(topic_names)
click to toggle source
Do we have metadata for these topics already?
@param [Enumberable<String>] topic_names A set of topics. @return [Boolean] true if we have metadata for all topic_names
, otherwise false.
# File lib/poseidon/cluster_metadata.rb, line 30 def have_metadata_for_topics?(topic_names) topic_names.all? { |topic| @topic_metadata[topic] } end
lead_broker_for_partition(topic_name, partition)
click to toggle source
Return lead broker for topic and partition
# File lib/poseidon/cluster_metadata.rb, line 51 def lead_broker_for_partition(topic_name, partition) unless @topic_metadata[topic_name] raise ::Poseidon::Errors::UnknownTopicOrPartition, "Topic not found: #{topic_name}" end unless (broker_id = @topic_metadata[topic_name].partition_leader(partition)) raise ::Poseidon::Errors::UnknownTopicOrPartition, "Partition not found #{topic_name}/#{partition}" end unless (broker = @brokers[broker_id]) raise ::Poseidon::Errors::LeaderNotAvailable, "Lead broker #{broker_id} not present in brokers: #{@brokers.keys.inspect}" end broker end
metadata_for_topics(topic_names)
click to toggle source
Provides metadata for each topic
@param [Enumerable<String>] topic_names Topics we should return metadata for @return [Hash<String,TopicMetadata>]
# File lib/poseidon/cluster_metadata.rb, line 38 def metadata_for_topics(topic_names) Hash[topic_names.map { |name| [name, @topic_metadata[name]] }] end
reset()
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 84 def reset @brokers = {} @topic_metadata = {} end
to_s()
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 74 def to_s out = "" @topic_metadata.each do |topic, metadata| out << "Topic: #{topic}" out << "-------------------------" out << metadata.to_s end out end
topics()
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 70 def topics @topic_metadata.keys end
update(topic_metadata_response)
click to toggle source
Update what we know about the cluter based on MetadataResponse
@param [MetadataResponse] topic_metadata_response @return nil
# File lib/poseidon/cluster_metadata.rb, line 18 def update(topic_metadata_response) update_brokers(topic_metadata_response.brokers) update_topics(topic_metadata_response.topics) @last_refreshed_at = Time.now nil end
Private Instance Methods
update_brokers(brokers)
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 98 def update_brokers(brokers) brokers.each do |broker| @brokers[broker.id] = broker end end
update_topics(topics)
click to toggle source
# File lib/poseidon/cluster_metadata.rb, line 90 def update_topics(topics) topics.each do |topic| if topic.exists? @topic_metadata[topic.name] = topic end end end