class Poseidon::SyncProducer

Used by Producer for sending messages to the kafka cluster.

You should not use this interface directly

Fetches metadata at appropriate times. Builds MessagesToSend Handle MessageBatchToSend lifecyle

Who is responsible for fetching metadata from broker seed list?

Do we want to be fetching from real live brokers eventually?

@api private

Constants

OPTION_DEFAULTS

Attributes

ack_timeout_ms[R]
client_id[R]
max_send_retries[R]
metadata_refresh_interval_ms[R]
required_acks[R]
retry_backoff_ms[R]
socket_timeout_ms[R]

Public Class Methods

new(client_id, seed_brokers, options = {}) click to toggle source
# File lib/poseidon/sync_producer.rb, line 29
def initialize(client_id, seed_brokers, options = {})
  @client_id = client_id

  handle_options(options.dup)

  @cluster_metadata   = ClusterMetadata.new
  @message_conductor  = MessageConductor.new(@cluster_metadata, @partitioner)
  @broker_pool        = BrokerPool.new(client_id, seed_brokers, socket_timeout_ms)
end

Public Instance Methods

close() click to toggle source
# File lib/poseidon/sync_producer.rb, line 74
def close
  @broker_pool.close
end
Also aliased as: shutdown
send_messages(messages) click to toggle source
# File lib/poseidon/sync_producer.rb, line 39
def send_messages(messages)
  return if messages.empty?

  messages_to_send = MessagesToSend.new(messages, @cluster_metadata)

  if refresh_interval_elapsed?
    refresh_metadata(messages_to_send.topic_set)
  end

  ensure_metadata_available_for_topics(messages_to_send)

  (@max_send_retries+1).times do
    messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker|
      if sent = send_to_broker(messages_for_broker)
        messages_to_send.successfully_sent(sent)
      end
    end

    if !messages_to_send.pending_messages? || @max_send_retries == 0
      break
    else
      Poseidon.logger.debug { "retrying sending #{messages_to_send.messages} messages"  }
      Kernel.sleep retry_backoff_ms / 1000.0
      reset_metadata
      ensure_metadata_available_for_topics(messages_to_send)
    end
  end

  if messages_to_send.pending_messages?
    raise Errors::MessageDeliveryError, "Failed to send all messages: #{messages_to_send.messages} remaining"
  else
    true
  end
end
shutdown()
Alias for: close

Private Instance Methods

ensure_metadata_available_for_topics(messages_to_send) click to toggle source
# File lib/poseidon/sync_producer.rb, line 82
def ensure_metadata_available_for_topics(messages_to_send)
  return if !messages_to_send.needs_metadata?

  Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt 1)" }
  refresh_metadata(messages_to_send.topic_set)
  return if !messages_to_send.needs_metadata?

  2.times do |n|
    sleep 5

    Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt #{n+2})" }
    refresh_metadata(messages_to_send.topic_set)
    return if !messages_to_send.needs_metadata?
  end
  raise Errors::UnableToFetchMetadata
end
handle_option(options, sym) click to toggle source
# File lib/poseidon/sync_producer.rb, line 119
def handle_option(options, sym)
  options.delete(sym) || OPTION_DEFAULTS[sym]
end
handle_options(options) click to toggle source
# File lib/poseidon/sync_producer.rb, line 99
def handle_options(options)
  @ack_timeout_ms    = handle_option(options, :ack_timeout_ms)
  @socket_timeout_ms = handle_option(options, :socket_timeout_ms)
  @retry_backoff_ms  = handle_option(options, :retry_backoff_ms)

  @metadata_refresh_interval_ms =
    handle_option(options, :metadata_refresh_interval_ms)

  @required_acks    = handle_option(options, :required_acks)
  @max_send_retries = handle_option(options, :max_send_retries)

  @compression_config = ProducerCompressionConfig.new(
    handle_option(options, :compression_codec),
    handle_option(options, :compressed_topics))

  @partitioner = handle_option(options, :partitioner)

  raise ArgumentError, "Unknown options: #{options.keys.inspect}" if options.keys.any?
end
refresh_interval_elapsed?() click to toggle source
# File lib/poseidon/sync_producer.rb, line 123
def refresh_interval_elapsed?
  @cluster_metadata.last_refreshed_at.nil? ||
    (Time.now - @cluster_metadata.last_refreshed_at) * 1000 > metadata_refresh_interval_ms
end
refresh_metadata(topics) click to toggle source
# File lib/poseidon/sync_producer.rb, line 128
def refresh_metadata(topics)
  topics_to_refresh = topics.dup

  @cluster_metadata.topics.each do |topic|
    topics_to_refresh.add(topic)
  end

  @cluster_metadata.update(@broker_pool.fetch_metadata(topics_to_refresh))
  @broker_pool.update_known_brokers(@cluster_metadata.brokers)
end
reset_metadata() click to toggle source
# File lib/poseidon/sync_producer.rb, line 139
def reset_metadata
  Poseidon.logger.debug { "Resetting metdata" }
  @cluster_metadata.reset
  @broker_pool.close
end
send_to_broker(messages_for_broker) click to toggle source
# File lib/poseidon/sync_producer.rb, line 145
def send_to_broker(messages_for_broker)
  return false if messages_for_broker.broker_id == -1
  to_send = messages_for_broker.build_protocol_objects(@compression_config)

  Poseidon.logger.debug { "Sending messages to broker #{messages_for_broker.broker_id}" }
  response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce,
                                          required_acks, ack_timeout_ms,
                                          to_send)
  if required_acks == 0
    messages_for_broker.messages
  else
    messages_for_broker.successfully_sent(response)
  end
rescue Connection::ConnectionFailedError => ex
  Poseidon.logger.warn { "Failed to send messages to #{messages_for_broker.broker_id} due to connection failure: message=#{ex.message}" }
  false
end