class Kafka::Compressor

Compresses message sets using a specified codec.

A message set is only compressed if its size meets the defined threshold.

## Instrumentation

Whenever a message set is compressed, the notification `compress.compressor.kafka` will be emitted with the following payload:

Public Class Methods

new(codec_name: nil, threshold: 1, instrumenter:) click to toggle source

@param codec_name [Symbol, nil] @param threshold [Integer] the minimum number of messages in a message set

that will trigger compression.
# File lib/kafka/compressor.rb, line 25
def initialize(codec_name: nil, threshold: 1, instrumenter:)
  # Codec may be nil, in which case we won't compress.
  @codec = codec_name && Compression.find_codec(codec_name)

  @threshold = threshold
  @instrumenter = instrumenter
end

Public Instance Methods

compress(record_batch, offset: -1) click to toggle source

@param record_batch [Protocol::RecordBatch] @param offset [Integer] used to simulate broker behaviour in tests @return [Protocol::RecordBatch]

# File lib/kafka/compressor.rb, line 36
def compress(record_batch, offset: -1)
  if record_batch.is_a?(Protocol::RecordBatch)
    compress_record_batch(record_batch)
  else
    # Deprecated message set format
    compress_message_set(record_batch, offset)
  end
end

Private Instance Methods

compress_message_set(message_set, offset) click to toggle source
# File lib/kafka/compressor.rb, line 47
def compress_message_set(message_set, offset)
  return message_set if @codec.nil? || message_set.size < @threshold

  data = Protocol::Encoder.encode_with(message_set)
  compressed_data = @codec.compress(data)

  @instrumenter.instrument("compress.compressor") do |notification|
    notification[:message_count] = message_set.size
    notification[:uncompressed_bytesize] = data.bytesize
    notification[:compressed_bytesize] = compressed_data.bytesize
  end

  wrapper_message = Protocol::Message.new(
    value: compressed_data,
    codec_id: @codec.codec_id,
    offset: offset
  )

  Protocol::MessageSet.new(messages: [wrapper_message])
end
compress_record_batch(record_batch) click to toggle source
# File lib/kafka/compressor.rb, line 68
def compress_record_batch(record_batch)
  if @codec.nil? || record_batch.size < @threshold
    record_batch.codec_id = 0
    return Protocol::Encoder.encode_with(record_batch)
  end

  record_batch.codec_id = @codec.codec_id
  data = Protocol::Encoder.encode_with(record_batch)

  @instrumenter.instrument("compress.compressor") do |notification|
    notification[:message_count] = record_batch.size
    notification[:compressed_bytesize] = data.bytesize
  end

  data
end