class Kafka::Protocol::ProduceRequest

A produce request sends a message set to the server.

## API Specification

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
    RequiredAcks => int16
    Timeout => int32
    Partition => int32
    MessageSetSize => int32

MessageSet => [Offset MessageSize Message]
    Offset => int64
    MessageSize => int32

Message => Crc MagicByte Attributes Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Attributes

compressor[R]
messages_for_topics[R]
required_acks[R]
timeout[R]
transactional_id[R]

Public Class Methods

new(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) click to toggle source

@param required_acks [Integer] @param timeout [Integer] @param messages_for_topics [Hash]

# File lib/kafka/protocol/produce_request.rb, line 35
def initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil)
  @transactional_id = transactional_id
  @required_acks = required_acks
  @timeout = timeout
  @messages_for_topics = messages_for_topics
  @compressor = compressor
end

Public Instance Methods

api_key() click to toggle source
# File lib/kafka/protocol/produce_request.rb, line 43
def api_key
  PRODUCE_API
end
api_version() click to toggle source
# File lib/kafka/protocol/produce_request.rb, line 47
def api_version
  3
end
encode(encoder) click to toggle source
# File lib/kafka/protocol/produce_request.rb, line 63
def encode(encoder)
  encoder.write_string(@transactional_id)
  encoder.write_int16(@required_acks)
  encoder.write_int32(@timeout)

  encoder.write_array(@messages_for_topics) do |topic, messages_for_partition|
    encoder.write_string(topic)

    encoder.write_array(messages_for_partition) do |partition, record_batch|
      encoder.write_int32(partition)

      record_batch.fulfill_relative_data
      encoded_record_batch = compress(record_batch)
      encoder.write_bytes(encoded_record_batch)
    end
  end
end
requires_acks?() click to toggle source

Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.

@return [Boolean] true if acknowledgements are required, false otherwise.

# File lib/kafka/protocol/produce_request.rb, line 59
def requires_acks?
  @required_acks != 0
end
response_class() click to toggle source
# File lib/kafka/protocol/produce_request.rb, line 51
def response_class
  requires_acks? ? Protocol::ProduceResponse : nil
end

Private Instance Methods

compress(record_batch) click to toggle source
# File lib/kafka/protocol/produce_request.rb, line 83
def compress(record_batch)
  if @compressor.nil?
    Protocol::Encoder.encode_with(record_batch)
  else
    @compressor.compress(record_batch)
  end
end