class Kafka::OffsetManager

Manages a consumer's position in partitions, figures out where to resume processing from, etc.

Constants

DEFAULT_RETENTION_TIME

The default broker setting for offsets.retention.minutes is 1440.

Public Class Methods

new(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) click to toggle source
# File lib/kafka/offset_manager.rb, line 12
def initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:)
  @cluster = cluster
  @group = group
  @fetcher = fetcher
  @logger = TaggedLogger.new(logger)
  @commit_interval = commit_interval
  @commit_threshold = commit_threshold

  @uncommitted_offsets = 0
  @processed_offsets = {}
  @default_offsets = {}
  @committed_offsets = nil
  @resolved_offsets = {}
  @last_commit = Time.now
  @last_recommit = nil
  @recommit_interval = (offset_retention_time || DEFAULT_RETENTION_TIME) / 2
end

Public Instance Methods

clear_offsets() click to toggle source

Clear all stored offset information.

@return [nil]

# File lib/kafka/offset_manager.rb, line 149
def clear_offsets
  @processed_offsets.clear
  @resolved_offsets.clear

  # Clear the cached commits from the brokers.
  @committed_offsets = nil
end
clear_offsets_excluding(excluded) click to toggle source

Clear stored offset information for all partitions except those specified in `excluded`.

offset_manager.clear_offsets_excluding("my-topic" => [1, 2, 3])

@return [nil]

# File lib/kafka/offset_manager.rb, line 163
def clear_offsets_excluding(excluded)
  # Clear all offsets that aren't in `excluded`.
  @processed_offsets.each do |topic, partitions|
    partitions.keep_if do |partition, _|
      excluded.fetch(topic, []).include?(partition)
    end
  end

  # Clear the cached commits from the brokers.
  @committed_offsets = nil
  @resolved_offsets.clear
end
commit_offsets(recommit = false) click to toggle source

Commit offsets of messages that have been marked as processed.

If `recommit` is set to true, we will also commit the existing positions even if no messages have been processed on a partition. This is done in order to avoid the offset information expiring in cases where messages are very rare – it's essentially a keep-alive.

@param recommit [Boolean] whether to recommit offsets that have already been

committed.

@return [nil]

# File lib/kafka/offset_manager.rb, line 120
def commit_offsets(recommit = false)
  offsets = offsets_to_commit(recommit)
  unless offsets.empty?
    @logger.debug "Committing offsets#{recommit ? ' with recommit' : ''}: #{prettify_offsets(offsets)}"

    @group.commit_offsets(offsets)

    @last_commit = Time.now
    @last_recommit = Time.now if recommit

    @uncommitted_offsets = 0
    @committed_offsets = nil
  end
end
commit_offsets_if_necessary() click to toggle source

Commit offsets if necessary, according to the offset commit policy specified when initializing the class.

@return [nil]

# File lib/kafka/offset_manager.rb, line 139
def commit_offsets_if_necessary
  recommit = recommit_timeout_reached?
  if recommit || commit_timeout_reached? || commit_threshold_reached?
    commit_offsets(recommit)
  end
end
mark_as_processed(topic, partition, offset) click to toggle source

Mark a message as having been processed.

When offsets are committed, the message's offset will be stored in Kafka so that we can resume from this point at a later time.

@param topic [String] the name of the topic. @param partition [Integer] the partition number. @param offset [Integer] the offset of the message that should be marked as processed. @return [nil]

# File lib/kafka/offset_manager.rb, line 52
def mark_as_processed(topic, partition, offset)
  @uncommitted_offsets += 1
  @processed_offsets[topic] ||= {}

  # The committed offset should always be the offset of the next message that the
  # application will read, thus adding one to the last message processed.
  @processed_offsets[topic][partition] = offset + 1
  @logger.debug "Marking #{topic}/#{partition}:#{offset} as processed"
end
next_offset_for(topic, partition) click to toggle source

Return the next offset that should be fetched for the specified partition.

@param topic [String] the name of the topic. @param partition [Integer] the partition number. @return [Integer] the next offset that should be fetched.

# File lib/kafka/offset_manager.rb, line 95
def next_offset_for(topic, partition)
  offset = @processed_offsets.fetch(topic, {}).fetch(partition) {
    committed_offset_for(topic, partition)
  }

  # A negative offset means that no offset has been committed, so we need to
  # resolve the default offset for the topic.
  if offset < 0
    resolve_offset(topic, partition)
  else
    # The next offset is the last offset.
    offset
  end
end
seek_to(topic, partition, offset) click to toggle source

Move the consumer's position in the partition to the specified offset.

@param topic [String] the name of the topic. @param partition [Integer] the partition number. @param offset [Integer] the offset that the consumer position should be moved to. @return [nil]

# File lib/kafka/offset_manager.rb, line 83
def seek_to(topic, partition, offset)
  @processed_offsets[topic] ||= {}
  @processed_offsets[topic][partition] = offset

  @fetcher.seek(topic, partition, offset)
end
seek_to_default(topic, partition) click to toggle source

Move the consumer's position in the partition back to the configured default offset, either the first or latest in the partition.

@param topic [String] the name of the topic. @param partition [Integer] the partition number. @return [nil]

# File lib/kafka/offset_manager.rb, line 68
def seek_to_default(topic, partition)
  # Remove any cached offset, in case things have changed broker-side.
  clear_resolved_offset(topic)

  offset = resolve_offset(topic, partition)

  seek_to(topic, partition, offset)
end
set_default_offset(topic, default_offset) click to toggle source

Set the default offset for a topic.

When the consumer is started for the first time, or in cases where it gets stuck and has to reset its position, it must start either with the earliest messages or with the latest, skipping to the very end of each partition.

@param topic [String] the name of the topic. @param default_offset [Symbol] either `:earliest` or `:latest`. @return [nil]

# File lib/kafka/offset_manager.rb, line 39
def set_default_offset(topic, default_offset)
  @default_offsets[topic] = default_offset
end

Private Instance Methods

clear_resolved_offset(topic) click to toggle source
# File lib/kafka/offset_manager.rb, line 178
def clear_resolved_offset(topic)
  @resolved_offsets.delete(topic)
end
commit_threshold_reached?() click to toggle source
# File lib/kafka/offset_manager.rb, line 238
def commit_threshold_reached?
  @commit_threshold != 0 && @uncommitted_offsets >= @commit_threshold
end
commit_timeout_reached?() click to toggle source
# File lib/kafka/offset_manager.rb, line 234
def commit_timeout_reached?
  @commit_interval != 0 && seconds_since_last_commit >= @commit_interval
end
committed_offset_for(topic, partition) click to toggle source
# File lib/kafka/offset_manager.rb, line 206
def committed_offset_for(topic, partition)
  committed_offsets.offset_for(topic, partition)
end
committed_offsets() click to toggle source
# File lib/kafka/offset_manager.rb, line 202
def committed_offsets
  @committed_offsets ||= @group.fetch_offsets
end
fetch_resolved_offsets(topic) click to toggle source
# File lib/kafka/offset_manager.rb, line 187
def fetch_resolved_offsets(topic)
  default_offset = @default_offsets.fetch(topic)
  partitions = @group.assigned_partitions.fetch(topic)

  @cluster.resolve_offsets(topic, partitions, default_offset)
end
offsets_to_commit(recommit = false) click to toggle source
# File lib/kafka/offset_manager.rb, line 210
def offsets_to_commit(recommit = false)
  if recommit
    offsets_to_recommit.merge!(@processed_offsets) do |_topic, committed, processed|
      committed.merge!(processed)
    end
  else
    @processed_offsets
  end
end
offsets_to_recommit() click to toggle source
# File lib/kafka/offset_manager.rb, line 220
def offsets_to_recommit
  committed_offsets.topics.each_with_object({}) do |(topic, partition_info), offsets|
    topic_offsets = partition_info.keys.each_with_object({}) do |partition, partition_map|
      offset = committed_offsets.offset_for(topic, partition)
      partition_map[partition] = offset unless offset == -1
    end
    offsets[topic] = topic_offsets unless topic_offsets.empty?
  end
end
prettify_offsets(offsets) click to toggle source
# File lib/kafka/offset_manager.rb, line 242
def prettify_offsets(offsets)
  offsets.flat_map do |topic, partitions|
    partitions.map { |partition, offset| "#{topic}/#{partition}:#{offset}" }
  end.join(', ')
end
recommit_timeout_reached?() click to toggle source
# File lib/kafka/offset_manager.rb, line 230
def recommit_timeout_reached?
  @last_recommit.nil? || seconds_since(@last_recommit) >= @recommit_interval
end
resolve_offset(topic, partition) click to toggle source
# File lib/kafka/offset_manager.rb, line 182
def resolve_offset(topic, partition)
  @resolved_offsets[topic] ||= fetch_resolved_offsets(topic)
  @resolved_offsets[topic].fetch(partition)
end
seconds_since(time) click to toggle source
# File lib/kafka/offset_manager.rb, line 194
def seconds_since(time)
  Time.now - time
end
seconds_since_last_commit() click to toggle source
# File lib/kafka/offset_manager.rb, line 198
def seconds_since_last_commit
  seconds_since(@last_commit)
end