class Kafka::RoundRobinAssignmentStrategy

A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.

Public Class Methods

new(cluster:) click to toggle source
# File lib/kafka/round_robin_assignment_strategy.rb, line 10
def initialize(cluster:)
  @cluster = cluster
end

Public Instance Methods

assign(members:, topics:) click to toggle source

Assign the topic partitions to the group members.

@param members [Array<String>] member ids @param topics [Array<String>] topics @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member

ids to assignments.
# File lib/kafka/round_robin_assignment_strategy.rb, line 20
def assign(members:, topics:)
  group_assignment = {}

  members.each do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end

  topic_partitions = topics.flat_map do |topic|
    begin
      partitions = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    Array.new(partitions.count) { topic }.zip(partitions)
  end

  partitions_per_member = topic_partitions.group_by.with_index do |_, index|
    index % members.count
  end.values

  members.zip(partitions_per_member).each do |member_id, member_partitions|
    unless member_partitions.nil?
      member_partitions.each do |topic, partition|
        group_assignment[member_id].assign(topic, [partition])
      end
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end