class Kafka::MessageBuffer

Buffers messages for specific topics/partitions.

Attributes

bytesize[R]
size[R]

Public Class Methods

new() click to toggle source
# File lib/kafka/message_buffer.rb, line 13
def initialize
  @buffer = {}
  @size = 0
  @bytesize = 0
end

Public Instance Methods

clear() click to toggle source

Clears messages across all topics and partitions.

@return [nil]

# File lib/kafka/message_buffer.rb, line 74
def clear
  @buffer = {}
  @size = 0
  @bytesize = 0
end
clear_messages(topic:, partition:) click to toggle source

Clears buffered messages for the given topic and partition.

@param topic [String] the name of the topic. @param partition [Integer] the partition id.

@return [nil]

# File lib/kafka/message_buffer.rb, line 57
def clear_messages(topic:, partition:)
  return unless @buffer.key?(topic) && @buffer[topic].key?(partition)

  @size -= @buffer[topic][partition].count
  @bytesize -= @buffer[topic][partition].map(&:bytesize).reduce(0, :+)

  @buffer[topic].delete(partition)
  @buffer.delete(topic) if @buffer[topic].empty?
end
concat(messages, topic:, partition:) click to toggle source
# File lib/kafka/message_buffer.rb, line 28
def concat(messages, topic:, partition:)
  buffer_for(topic, partition).concat(messages)

  @size += messages.count
  @bytesize += messages.map(&:bytesize).reduce(0, :+)
end
each() { |topic, partition, messages_for_partition| ... } click to toggle source
# File lib/kafka/message_buffer.rb, line 43
def each
  @buffer.each do |topic, messages_for_topic|
    messages_for_topic.each do |partition, messages_for_partition|
      yield topic, partition, messages_for_partition
    end
  end
end
empty?() click to toggle source
# File lib/kafka/message_buffer.rb, line 39
def empty?
  @buffer.empty?
end
messages_for(topic:, partition:) click to toggle source
# File lib/kafka/message_buffer.rb, line 67
def messages_for(topic:, partition:)
  buffer_for(topic, partition)
end
to_h() click to toggle source
# File lib/kafka/message_buffer.rb, line 35
def to_h
  @buffer
end
write(value:, key:, topic:, partition:, create_time: Time.now, headers: {}) click to toggle source
# File lib/kafka/message_buffer.rb, line 19
def write(value:, key:, topic:, partition:, create_time: Time.now, headers: {})
  message = Protocol::Record.new(key: key, value: value, create_time: create_time, headers: headers)

  buffer_for(topic, partition) << message

  @size += 1
  @bytesize += message.bytesize
end

Private Instance Methods

buffer_for(topic, partition) click to toggle source
# File lib/kafka/message_buffer.rb, line 82
def buffer_for(topic, partition)
  @buffer[topic] ||= {}
  @buffer[topic][partition] ||= []
end