class Poseidon::MessagesToSend

A set of messages that we need to send to the cluster. May be used across multiple send attempts.

If a custom partitioner is not used than a messages are distributed in round robin fasion to each partition with an available leader.

@api private

Attributes

messages[R]
topic_set[R]

Public Class Methods

new(messages, cluster_metadata) click to toggle source

Create a new messages to send object.

@param [Array<Message>] messages List of messages we want to send. @param [ClusterMetadta] cluster_metadata

# File lib/poseidon/messages_to_send.rb, line 17
def initialize(messages, cluster_metadata)
  @messages         = messages
  @cluster_metadata = cluster_metadata

  build_topic_set
end

Public Instance Methods

messages_for_brokers(message_conductor) click to toggle source
# File lib/poseidon/messages_to_send.rb, line 28
def messages_for_brokers(message_conductor)
  topic_metadatas = @cluster_metadata.metadata_for_topics(topic_set)
  MessagesToSendBatch.new(@messages, message_conductor).messages_for_brokers
end
needs_metadata?() click to toggle source
# File lib/poseidon/messages_to_send.rb, line 24
def needs_metadata?
  !@cluster_metadata.have_metadata_for_topics?(topic_set)
end
pending_messages?() click to toggle source
# File lib/poseidon/messages_to_send.rb, line 37
def pending_messages?
  @messages.any?
end
successfully_sent(messages_sent) click to toggle source
# File lib/poseidon/messages_to_send.rb, line 33
def successfully_sent(messages_sent)
  @messages -= messages_sent
end

Private Instance Methods

build_topic_set() click to toggle source
# File lib/poseidon/messages_to_send.rb, line 42
def build_topic_set
  @topic_set = Set.new
  @messages.each { |m| @topic_set.add(m.topic) }
end