class Kafka::MessageBuffer
Buffers messages for specific topics/partitions.
Attributes
bytesize[R]
size[R]
Public Class Methods
new()
click to toggle source
# File lib/kafka/message_buffer.rb, line 13 def initialize @buffer = {} @size = 0 @bytesize = 0 end
Public Instance Methods
clear()
click to toggle source
Clears messages across all topics and partitions.
@return [nil]
# File lib/kafka/message_buffer.rb, line 74 def clear @buffer = {} @size = 0 @bytesize = 0 end
clear_messages(topic:, partition:)
click to toggle source
Clears buffered messages for the given topic and partition.
@param topic [String] the name of the topic. @param partition [Integer] the partition id.
@return [nil]
# File lib/kafka/message_buffer.rb, line 57 def clear_messages(topic:, partition:) return unless @buffer.key?(topic) && @buffer[topic].key?(partition) @size -= @buffer[topic][partition].count @bytesize -= @buffer[topic][partition].map(&:bytesize).reduce(0, :+) @buffer[topic].delete(partition) @buffer.delete(topic) if @buffer[topic].empty? end
concat(messages, topic:, partition:)
click to toggle source
# File lib/kafka/message_buffer.rb, line 28 def concat(messages, topic:, partition:) buffer_for(topic, partition).concat(messages) @size += messages.count @bytesize += messages.map(&:bytesize).reduce(0, :+) end
each() { |topic, partition, messages_for_partition| ... }
click to toggle source
# File lib/kafka/message_buffer.rb, line 43 def each @buffer.each do |topic, messages_for_topic| messages_for_topic.each do |partition, messages_for_partition| yield topic, partition, messages_for_partition end end end
empty?()
click to toggle source
# File lib/kafka/message_buffer.rb, line 39 def empty? @buffer.empty? end
messages_for(topic:, partition:)
click to toggle source
# File lib/kafka/message_buffer.rb, line 67 def messages_for(topic:, partition:) buffer_for(topic, partition) end
to_h()
click to toggle source
# File lib/kafka/message_buffer.rb, line 35 def to_h @buffer end
write(value:, key:, topic:, partition:, create_time: Time.now, headers: {})
click to toggle source
# File lib/kafka/message_buffer.rb, line 19 def write(value:, key:, topic:, partition:, create_time: Time.now, headers: {}) message = Protocol::Record.new(key: key, value: value, create_time: create_time, headers: headers) buffer_for(topic, partition) << message @size += 1 @bytesize += message.bytesize end
Private Instance Methods
buffer_for(topic, partition)
click to toggle source
# File lib/kafka/message_buffer.rb, line 82 def buffer_for(topic, partition) @buffer[topic] ||= {} @buffer[topic][partition] ||= [] end