class Poseidon::MessagesToSend
A set of messages that we need to send to the cluster. May be used across multiple send attempts.
If a custom partitioner is not used than a messages are distributed in round robin fasion to each partition with an available leader.
@api private
Attributes
messages[R]
topic_set[R]
Public Class Methods
new(messages, cluster_metadata)
click to toggle source
Create a new messages to send object.
@param [Array<Message>] messages List of messages we want to send. @param [ClusterMetadta] cluster_metadata
# File lib/poseidon/messages_to_send.rb, line 17 def initialize(messages, cluster_metadata) @messages = messages @cluster_metadata = cluster_metadata build_topic_set end
Public Instance Methods
messages_for_brokers(message_conductor)
click to toggle source
# File lib/poseidon/messages_to_send.rb, line 28 def messages_for_brokers(message_conductor) topic_metadatas = @cluster_metadata.metadata_for_topics(topic_set) MessagesToSendBatch.new(@messages, message_conductor).messages_for_brokers end
needs_metadata?()
click to toggle source
# File lib/poseidon/messages_to_send.rb, line 24 def needs_metadata? !@cluster_metadata.have_metadata_for_topics?(topic_set) end
pending_messages?()
click to toggle source
# File lib/poseidon/messages_to_send.rb, line 37 def pending_messages? @messages.any? end
successfully_sent(messages_sent)
click to toggle source
# File lib/poseidon/messages_to_send.rb, line 33 def successfully_sent(messages_sent) @messages -= messages_sent end
Private Instance Methods
build_topic_set()
click to toggle source
# File lib/poseidon/messages_to_send.rb, line 42 def build_topic_set @topic_set = Set.new @messages.each { |m| @topic_set.add(m.topic) } end