class Kazoo::CLI::Consumers

Public Instance Methods

clean_stored_offsets(name) click to toggle source
# File lib/kazoo/cli/consumers.rb, line 118
def clean_stored_offsets(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot clean offsets for #{cg.name} if it is not running" unless cg.active?

  cg.clean_stored_offsets
end
clean_topic_claims(name) click to toggle source
# File lib/kazoo/cli/consumers.rb, line 94
def clean_topic_claims(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot cleanup consumergroup #{cg.name} if it is not running" unless cg.active?

  subscribed_topics = cg.subscribed_topics
  claimed_topics    = cg.claimed_topics
  to_clean          = claimed_topics - subscribed_topics

  if to_clean.empty?
    puts "The consumer group does not have any lingering topic claims."
  else
    puts "The following topics were once claimed, but are no longer part of #{cg.name}'s subscriptions:"
    to_clean.each do |topic|
      puts "- #{topic.name}"
    end

    cg.clean_topic_claims
  end
end
delete(name) click to toggle source
# File lib/kazoo/cli/consumers.rb, line 72
def delete(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.destroy
end
list() click to toggle source
# File lib/kazoo/cli/consumers.rb, line 7
def list
  validate_class_options!

  kafka_cluster.consumergroups.sort_by(&:name).each do |group|
    instances = group.instances
    if instances.length == 0
      puts "- #{group.name}: inactive"
    else
      puts "- #{group.name}: #{instances.length} running instances"
    end
  end
end
reset(name) click to toggle source
# File lib/kazoo/cli/consumers.rb, line 83
def reset(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.reset_all_offsets
end
show(name) click to toggle source
# File lib/kazoo/cli/consumers.rb, line 21
def show(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?

  topics = cg.subscribed_topics.sort_by(&:name)

  puts "Consumer name: #{cg.name}"
  puts "Created on: #{cg.created_at}"
  puts "Topics (#{topics.length}): #{topics.map(&:name).join(', ')}"

  instances = cg.instances
  if instances.length > 0

    puts
    puts "Running instances (#{instances.length}):"
    instances.each do |instance|
      puts "- #{instance.id}\t(created on #{instance.created_at})"
    end

    partition_claims = cg.partition_claims
    if partition_claims.length > 0
      partitions = partition_claims.keys.sort_by { |p| [p.topic.name, p.id] }

      puts
      puts "Partition claims (#{partition_claims.length}):"
      partitions.each do |partition|
        instance = partition_claims[partition]
        puts "- #{partition.key}: #{instance.id}"
      end
    else
      puts
      puts "WARNING: this consumer group is active but hasn't claimed any partitions"
    end

    unclaimed_partitions = (cg.partitions - partition_claims.keys).sort_by { |p| [p.topic.name, p.id] }

    if unclaimed_partitions.length > 0
      puts
      puts "WARNING: this consumergroup has #{unclaimed_partitions.length} unclaimed partitions:"
      unclaimed_partitions.each do |partition|
        puts "- #{partition.key}"
      end
    end
  else
    puts "This consumer group is inactive."
  end
end