class Kazoo::Consumergroup

Attributes

cluster[R]
name[R]

Public Class Methods

new(cluster, name) click to toggle source
# File lib/kazoo/consumergroup.rb, line 5
def initialize(cluster, name)
  @cluster, @name = cluster, name
end

Public Instance Methods

==(other)
Alias for: eql?
active?() click to toggle source
# File lib/kazoo/consumergroup.rb, line 46
def active?
  instances.length > 0
end
claimed_topics() click to toggle source
# File lib/kazoo/consumergroup.rb, line 92
def claimed_topics
  topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners")
  case topic_result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    topic_result.fetch(:children).map { |topic_name| cluster.topic(topic_name) }
  when Zookeeper::Constants::ZNONODE
    []
  else
    raise Kazoo::Error, "Failed to get subscribed topics. Result code: #{topic_result.fetch(:rc)}"
  end
end
clean_stored_offsets(subscription = nil) click to toggle source
# File lib/kazoo/consumergroup.rb, line 279
def clean_stored_offsets(subscription = nil)
  subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription)

  topics_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets")
  raise Kazoo::Error, "Failed to retrieve list of topics. Error code: #{topics_result.fetch(:rc)}" if topics_result.fetch(:rc) != Zookeeper::Constants::ZOK

  threads = topics_result.fetch(:children).map do |topic_name|
    Thread.new do
      Thread.abort_on_exception = true
      topic = cluster.topic(topic_name)
      unless subscription.topics(cluster).include?(topic)
        cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets/#{topic.name}")
      end
    end
  end

  threads.each(&:join)
end
clean_topic_claims(subscription = nil) click to toggle source
# File lib/kazoo/consumergroup.rb, line 264
def clean_topic_claims(subscription = nil)
  subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription)

  threads = claimed_topics.map do |topic|
    Thread.new do
      Thread.abort_on_exception = true
      unless subscription.topics(cluster).include?(topic)
        cluster.send(:recursive_delete, path: "/consumers/#{name}/owners/#{topic.name}")
      end
    end
  end

  threads.each(&:join)
end
commit_offset(partition, offset) click to toggle source
# File lib/kazoo/consumergroup.rb, line 245
def commit_offset(partition, offset)
  partition_offset_path = "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}"
  next_offset_data = (offset + 1).to_s

  result = cluster.zk.set(path: partition_offset_path, data: next_offset_data)
  if result.fetch(:rc) == Zookeeper::Constants::ZNONODE
    cluster.send(:recursive_create, path: partition_offset_path)
    result = cluster.zk.set(path: partition_offset_path, data: next_offset_data)
  end

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end
create() click to toggle source
# File lib/kazoo/consumergroup.rb, line 9
def create
  cluster.send(:recursive_create, path: "/consumers/#{name}/ids")
  cluster.send(:recursive_create, path: "/consumers/#{name}/owners")
  cluster.reset_metadata
end
created_at() click to toggle source
# File lib/kazoo/consumergroup.rb, line 25
def created_at
  result = cluster.zk.stat(path: "/consumers/#{name}")
  raise Kazoo::Error, "Failed to get consumer details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

  Time.at(result.fetch(:stat).mtime / 1000.0)
end
destroy() click to toggle source
# File lib/kazoo/consumergroup.rb, line 15
def destroy
  cluster.send(:recursive_delete, path: "/consumers/#{name}")
  cluster.reset_metadata
end
eql?(other) click to toggle source
# File lib/kazoo/consumergroup.rb, line 302
def eql?(other)
  other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name
end
Also aliased as: ==
exists?() click to toggle source
# File lib/kazoo/consumergroup.rb, line 20
def exists?
  stat = cluster.zk.stat(path: "/consumers/#{name}")
  stat.fetch(:stat).exists?
end
hash() click to toggle source
# File lib/kazoo/consumergroup.rb, line 308
def hash
  [cluster, name].hash
end
inspect() click to toggle source
# File lib/kazoo/consumergroup.rb, line 298
def inspect
  "#<Kazoo::Consumergroup name=#{name}>"
end
instances() click to toggle source
# File lib/kazoo/consumergroup.rb, line 50
def instances
  result = cluster.zk.get_children(path: "/consumers/#{name}/ids")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    instances_with_subscription(result.fetch(:children))
  when Zookeeper::Constants::ZNONODE
    []
  else
    raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}"
  end
end
instantiate(id: nil, subscription: nil) click to toggle source
# File lib/kazoo/consumergroup.rb, line 32
def instantiate(id: nil, subscription: nil)
  Instance.new(self, id: id, subscription: subscription)
end
partition_claims() click to toggle source
# File lib/kazoo/consumergroup.rb, line 127
def partition_claims
  topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners")
  case topic_result.fetch(:rc)
    when Zookeeper::Constants::ZOK; # continue
    when Zookeeper::Constants::ZNONODE; return {}
    else raise Kazoo::Error, "Failed to get partition claims. Result code: #{topic_result.fetch(:rc)}"
  end

  partition_claims, threads, mutex = {}, [], Mutex.new
  topic_result.fetch(:children).each do |topic_name|
    threads << Thread.new do
      topic = cluster.topic(topic_name)

      partition_result = cluster.zk.get_children(path: "/consumers/#{name}/owners/#{topic.name}")
      raise Kazoo::Error, "Failed to get partition claims. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK

      partition_threads = []
      partition_result.fetch(:children).each do |partition_id|
        partition_threads << Thread.new do
          partition = topic.partition(partition_id.to_i)

          claim_result =cluster.zk.get(path: "/consumers/#{name}/owners/#{topic.name}/#{partition.id}")
          raise Kazoo::Error, "Failed to get partition claims. Result code: #{claim_result.fetch(:rc)}" if claim_result.fetch(:rc) != Zookeeper::Constants::ZOK

          mutex.synchronize { partition_claims[partition] = instantiate(id: claim_result.fetch(:data)) }
        end
      end
      partition_threads.each(&:join)
    end
  end

  threads.each(&:join)
  return partition_claims
end
partitions() click to toggle source
# File lib/kazoo/consumergroup.rb, line 110
def partitions
  partitions, threads, mutex = [], [], Mutex.new
  topics.each do |topic|
    threads << Thread.new do
      topic_partitions = topic.partitions
      mutex.synchronize { partitions.concat(topic_partitions) }
    end
  end

  threads.each(&:join)
  return partitions
end
reset_all_offsets() click to toggle source
# File lib/kazoo/consumergroup.rb, line 260
def reset_all_offsets
  cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets")
end
retrieve_all_offsets() click to toggle source
# File lib/kazoo/consumergroup.rb, line 171
def retrieve_all_offsets
  topic_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets")
  case topic_result.fetch(:rc)
    when Zookeeper::Constants::ZOK; # continue
    when Zookeeper::Constants::ZNONODE; return {}
    else raise Kazoo::Error, "Failed to get topic offsets. Result code: #{topic_result.fetch(:rc)}"
  end

  offsets, mutex = {}, Mutex.new
  topic_threads = topic_result.fetch(:children).map do |topic_name|
    Thread.new do
      Thread.abort_on_exception = true

      topic = cluster.topic(topic_name)
      partition_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic.name}")
      raise Kazoo::Error, "Failed to get partition offsets. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK

      partition_threads = partition_result.fetch(:children).map do |partition_id|
        Thread.new do
          Thread.abort_on_exception = true

          partition = topic.partition(partition_id.to_i)
          offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}")
          offset = case offset_result.fetch(:rc)
          when Zookeeper::Constants::ZOK
            offset_result.fetch(:data).to_i
          when Zookeeper::Constants::ZNONODE
            nil
          else
            raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}"
          end
          mutex.synchronize { offsets[partition] = offset }
        end
      end
      partition_threads.each(&:join)
    end
  end

  topic_threads.each(&:join)
  return offsets
end
retrieve_offset(partition) click to toggle source
# File lib/kazoo/consumergroup.rb, line 162
def retrieve_offset(partition)
  result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}")
  case result.fetch(:rc)
    when Zookeeper::Constants::ZOK; result.fetch(:data).to_i
    when Zookeeper::Constants::ZNONODE; nil
    else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end
retrieve_offsets(subscription = self.subscription) click to toggle source
# File lib/kazoo/consumergroup.rb, line 213
def retrieve_offsets(subscription = self.subscription)
  subscription = Kazoo::Subscription.build(subscription)

  offsets, mutex = {}, Mutex.new
  topic_threads = subscription.topics(cluster).map do |topic|
    Thread.new do
      Thread.abort_on_exception = true

      partition_threads = topic.partitions.map do |partition|
        Thread.new do
          Thread.abort_on_exception = true

          offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}")
          offset = case offset_result.fetch(:rc)
          when Zookeeper::Constants::ZOK
            offset_result.fetch(:data).to_i
          when Zookeeper::Constants::ZNONODE
            nil
          else
            raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}"
          end
          mutex.synchronize { offsets[partition] = offset }
        end
      end
      partition_threads.each(&:join)
    end
  end

  topic_threads.each(&:join)
  return offsets
end
subscribed_topics() click to toggle source
# File lib/kazoo/consumergroup.rb, line 104
def subscribed_topics
  subscription.topics(cluster)
end
Also aliased as: topics
subscription() click to toggle source
# File lib/kazoo/consumergroup.rb, line 36
def subscription
  subscriptions = instances.map(&:subscription).compact
  raise NoRunningInstances, "Consumergroup #{name} has no running instances; cannot determine subscription" if subscriptions.length == 0

  subscriptions.uniq!
  raise InconsistentSubscriptions, "Subscriptions of running instances are different from each other" if subscriptions.length != 1

  subscriptions.first
end
topics()
Alias for: subscribed_topics
unclaimed_partitions() click to toggle source
# File lib/kazoo/consumergroup.rb, line 123
def unclaimed_partitions
  partitions - partition_claims.keys
end
watch_instances(&block) click to toggle source
# File lib/kazoo/consumergroup.rb, line 62
def watch_instances(&block)
  cb = Zookeeper::Callbacks::WatcherCallback.create(&block)
  result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb)
  instances = case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    instances_with_subscription(result.fetch(:children))
  when Zookeeper::Constants::ZNONODE
    []
  else
    raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}"
  end

  [instances, cb]
end
watch_partition_claim(partition, &block) click to toggle source
# File lib/kazoo/consumergroup.rb, line 77
def watch_partition_claim(partition, &block)
  cb = Zookeeper::Callbacks::WatcherCallback.create(&block)

  result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb)

  case result.fetch(:rc)
  when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet
    [nil, nil]
  when Zookeeper::Constants::ZOK
    [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb]
  else
    raise Kazoo::Error, "Failed to set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

Protected Instance Methods

instances_with_subscription(instance_ids) click to toggle source
# File lib/kazoo/consumergroup.rb, line 314
def instances_with_subscription(instance_ids)
  instances, threads, mutex = [], [], Mutex.new
  instance_ids.each do |id|
    threads << Thread.new do
      Thread.abort_on_exception = true

      subscription_result = cluster.zk.get(path: "/consumers/#{name}/ids/#{id}")
      raise Kazoo::Error, "Failed to retrieve subscription for instance. Error code: #{subscription_result.fetch(:rc)}" if subscription_result.fetch(:rc) != Zookeeper::Constants::ZOK
      subscription = Kazoo::Subscription.from_json(subscription_result.fetch(:data))
      mutex.synchronize { instances << Instance.new(self, id: id, subscription: subscription) }
    end
  end
  threads.each(&:join)
  instances
end