class Poseidon::SyncProducer
Used by Producer
for sending messages to the kafka cluster.
You should not use this interface directly
Fetches metadata at appropriate times. Builds MessagesToSend
Handle MessageBatchToSend lifecyle
Who is responsible for fetching metadata from broker seed list?
Do we want to be fetching from real live brokers eventually?
@api private
Constants
- OPTION_DEFAULTS
Attributes
ack_timeout_ms[R]
client_id[R]
max_send_retries[R]
metadata_refresh_interval_ms[R]
required_acks[R]
retry_backoff_ms[R]
socket_timeout_ms[R]
Public Class Methods
new(client_id, seed_brokers, options = {})
click to toggle source
# File lib/poseidon/sync_producer.rb, line 29 def initialize(client_id, seed_brokers, options = {}) @client_id = client_id handle_options(options.dup) @cluster_metadata = ClusterMetadata.new @message_conductor = MessageConductor.new(@cluster_metadata, @partitioner) @broker_pool = BrokerPool.new(client_id, seed_brokers, socket_timeout_ms) end
Public Instance Methods
close()
click to toggle source
# File lib/poseidon/sync_producer.rb, line 74 def close @broker_pool.close end
Also aliased as: shutdown
send_messages(messages)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 39 def send_messages(messages) return if messages.empty? messages_to_send = MessagesToSend.new(messages, @cluster_metadata) if refresh_interval_elapsed? refresh_metadata(messages_to_send.topic_set) end ensure_metadata_available_for_topics(messages_to_send) (@max_send_retries+1).times do messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| if sent = send_to_broker(messages_for_broker) messages_to_send.successfully_sent(sent) end end if !messages_to_send.pending_messages? || @max_send_retries == 0 break else Poseidon.logger.debug { "retrying sending #{messages_to_send.messages} messages" } Kernel.sleep retry_backoff_ms / 1000.0 reset_metadata ensure_metadata_available_for_topics(messages_to_send) end end if messages_to_send.pending_messages? raise Errors::MessageDeliveryError, "Failed to send all messages: #{messages_to_send.messages} remaining" else true end end
Private Instance Methods
ensure_metadata_available_for_topics(messages_to_send)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 82 def ensure_metadata_available_for_topics(messages_to_send) return if !messages_to_send.needs_metadata? Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt 1)" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? 2.times do |n| sleep 5 Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt #{n+2})" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? end raise Errors::UnableToFetchMetadata end
handle_option(options, sym)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 119 def handle_option(options, sym) options.delete(sym) || OPTION_DEFAULTS[sym] end
handle_options(options)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 99 def handle_options(options) @ack_timeout_ms = handle_option(options, :ack_timeout_ms) @socket_timeout_ms = handle_option(options, :socket_timeout_ms) @retry_backoff_ms = handle_option(options, :retry_backoff_ms) @metadata_refresh_interval_ms = handle_option(options, :metadata_refresh_interval_ms) @required_acks = handle_option(options, :required_acks) @max_send_retries = handle_option(options, :max_send_retries) @compression_config = ProducerCompressionConfig.new( handle_option(options, :compression_codec), handle_option(options, :compressed_topics)) @partitioner = handle_option(options, :partitioner) raise ArgumentError, "Unknown options: #{options.keys.inspect}" if options.keys.any? end
refresh_interval_elapsed?()
click to toggle source
# File lib/poseidon/sync_producer.rb, line 123 def refresh_interval_elapsed? @cluster_metadata.last_refreshed_at.nil? || (Time.now - @cluster_metadata.last_refreshed_at) * 1000 > metadata_refresh_interval_ms end
refresh_metadata(topics)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 128 def refresh_metadata(topics) topics_to_refresh = topics.dup @cluster_metadata.topics.each do |topic| topics_to_refresh.add(topic) end @cluster_metadata.update(@broker_pool.fetch_metadata(topics_to_refresh)) @broker_pool.update_known_brokers(@cluster_metadata.brokers) end
reset_metadata()
click to toggle source
# File lib/poseidon/sync_producer.rb, line 139 def reset_metadata Poseidon.logger.debug { "Resetting metdata" } @cluster_metadata.reset @broker_pool.close end
send_to_broker(messages_for_broker)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 145 def send_to_broker(messages_for_broker) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) Poseidon.logger.debug { "Sending messages to broker #{messages_for_broker.broker_id}" } response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) if required_acks == 0 messages_for_broker.messages else messages_for_broker.successfully_sent(response) end rescue Connection::ConnectionFailedError => ex Poseidon.logger.warn { "Failed to send messages to #{messages_for_broker.broker_id} due to connection failure: message=#{ex.message}" } false end