class Kazoo::Consumergroup
Attributes
cluster[R]
name[R]
Public Class Methods
new(cluster, name)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 5 def initialize(cluster, name) @cluster, @name = cluster, name end
Public Instance Methods
active?()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 46 def active? instances.length > 0 end
claimed_topics()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 92 def claimed_topics topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK topic_result.fetch(:children).map { |topic_name| cluster.topic(topic_name) } when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed to get subscribed topics. Result code: #{topic_result.fetch(:rc)}" end end
clean_stored_offsets(subscription = nil)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 279 def clean_stored_offsets(subscription = nil) subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription) topics_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") raise Kazoo::Error, "Failed to retrieve list of topics. Error code: #{topics_result.fetch(:rc)}" if topics_result.fetch(:rc) != Zookeeper::Constants::ZOK threads = topics_result.fetch(:children).map do |topic_name| Thread.new do Thread.abort_on_exception = true topic = cluster.topic(topic_name) unless subscription.topics(cluster).include?(topic) cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets/#{topic.name}") end end end threads.each(&:join) end
clean_topic_claims(subscription = nil)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 264 def clean_topic_claims(subscription = nil) subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription) threads = claimed_topics.map do |topic| Thread.new do Thread.abort_on_exception = true unless subscription.topics(cluster).include?(topic) cluster.send(:recursive_delete, path: "/consumers/#{name}/owners/#{topic.name}") end end end threads.each(&:join) end
commit_offset(partition, offset)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 245 def commit_offset(partition, offset) partition_offset_path = "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}" next_offset_data = (offset + 1).to_s result = cluster.zk.set(path: partition_offset_path, data: next_offset_data) if result.fetch(:rc) == Zookeeper::Constants::ZNONODE cluster.send(:recursive_create, path: partition_offset_path) result = cluster.zk.set(path: partition_offset_path, data: next_offset_data) end if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end
create()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 9 def create cluster.send(:recursive_create, path: "/consumers/#{name}/ids") cluster.send(:recursive_create, path: "/consumers/#{name}/owners") cluster.reset_metadata end
created_at()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 25 def created_at result = cluster.zk.stat(path: "/consumers/#{name}") raise Kazoo::Error, "Failed to get consumer details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end
destroy()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 15 def destroy cluster.send(:recursive_delete, path: "/consumers/#{name}") cluster.reset_metadata end
eql?(other)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 302 def eql?(other) other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name end
Also aliased as: ==
exists?()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 20 def exists? stat = cluster.zk.stat(path: "/consumers/#{name}") stat.fetch(:stat).exists? end
hash()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 308 def hash [cluster, name].hash end
inspect()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 298 def inspect "#<Kazoo::Consumergroup name=#{name}>" end
instances()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 50 def instances result = cluster.zk.get_children(path: "/consumers/#{name}/ids") case result.fetch(:rc) when Zookeeper::Constants::ZOK instances_with_subscription(result.fetch(:children)) when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}" end end
instantiate(id: nil, subscription: nil)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 32 def instantiate(id: nil, subscription: nil) Instance.new(self, id: id, subscription: subscription) end
partition_claims()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 127 def partition_claims topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to get partition claims. Result code: #{topic_result.fetch(:rc)}" end partition_claims, threads, mutex = {}, [], Mutex.new topic_result.fetch(:children).each do |topic_name| threads << Thread.new do topic = cluster.topic(topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/owners/#{topic.name}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = [] partition_result.fetch(:children).each do |partition_id| partition_threads << Thread.new do partition = topic.partition(partition_id.to_i) claim_result =cluster.zk.get(path: "/consumers/#{name}/owners/#{topic.name}/#{partition.id}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{claim_result.fetch(:rc)}" if claim_result.fetch(:rc) != Zookeeper::Constants::ZOK mutex.synchronize { partition_claims[partition] = instantiate(id: claim_result.fetch(:data)) } end end partition_threads.each(&:join) end end threads.each(&:join) return partition_claims end
partitions()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 110 def partitions partitions, threads, mutex = [], [], Mutex.new topics.each do |topic| threads << Thread.new do topic_partitions = topic.partitions mutex.synchronize { partitions.concat(topic_partitions) } end end threads.each(&:join) return partitions end
reset_all_offsets()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 260 def reset_all_offsets cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets") end
retrieve_all_offsets()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 171 def retrieve_all_offsets topic_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to get topic offsets. Result code: #{topic_result.fetch(:rc)}" end offsets, mutex = {}, Mutex.new topic_threads = topic_result.fetch(:children).map do |topic_name| Thread.new do Thread.abort_on_exception = true topic = cluster.topic(topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic.name}") raise Kazoo::Error, "Failed to get partition offsets. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = partition_result.fetch(:children).map do |partition_id| Thread.new do Thread.abort_on_exception = true partition = topic.partition(partition_id.to_i) offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}") offset = case offset_result.fetch(:rc) when Zookeeper::Constants::ZOK offset_result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE nil else raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}" end mutex.synchronize { offsets[partition] = offset } end end partition_threads.each(&:join) end end topic_threads.each(&:join) return offsets end
retrieve_offset(partition)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 162 def retrieve_offset(partition) result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}") case result.fetch(:rc) when Zookeeper::Constants::ZOK; result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE; nil else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end
retrieve_offsets(subscription = self.subscription)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 213 def retrieve_offsets(subscription = self.subscription) subscription = Kazoo::Subscription.build(subscription) offsets, mutex = {}, Mutex.new topic_threads = subscription.topics(cluster).map do |topic| Thread.new do Thread.abort_on_exception = true partition_threads = topic.partitions.map do |partition| Thread.new do Thread.abort_on_exception = true offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}") offset = case offset_result.fetch(:rc) when Zookeeper::Constants::ZOK offset_result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE nil else raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}" end mutex.synchronize { offsets[partition] = offset } end end partition_threads.each(&:join) end end topic_threads.each(&:join) return offsets end
subscribed_topics()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 104 def subscribed_topics subscription.topics(cluster) end
Also aliased as: topics
subscription()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 36 def subscription subscriptions = instances.map(&:subscription).compact raise NoRunningInstances, "Consumergroup #{name} has no running instances; cannot determine subscription" if subscriptions.length == 0 subscriptions.uniq! raise InconsistentSubscriptions, "Subscriptions of running instances are different from each other" if subscriptions.length != 1 subscriptions.first end
unclaimed_partitions()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 123 def unclaimed_partitions partitions - partition_claims.keys end
watch_instances(&block)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 62 def watch_instances(&block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb) instances = case result.fetch(:rc) when Zookeeper::Constants::ZOK instances_with_subscription(result.fetch(:children)) when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}" end [instances, cb] end
watch_partition_claim(partition, &block)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 77 def watch_partition_claim(partition, &block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb) case result.fetch(:rc) when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet [nil, nil] when Zookeeper::Constants::ZOK [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb] else raise Kazoo::Error, "Failed to set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end
Protected Instance Methods
instances_with_subscription(instance_ids)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 314 def instances_with_subscription(instance_ids) instances, threads, mutex = [], [], Mutex.new instance_ids.each do |id| threads << Thread.new do Thread.abort_on_exception = true subscription_result = cluster.zk.get(path: "/consumers/#{name}/ids/#{id}") raise Kazoo::Error, "Failed to retrieve subscription for instance. Error code: #{subscription_result.fetch(:rc)}" if subscription_result.fetch(:rc) != Zookeeper::Constants::ZOK subscription = Kazoo::Subscription.from_json(subscription_result.fetch(:data)) mutex.synchronize { instances << Instance.new(self, id: id, subscription: subscription) } end end threads.each(&:join) instances end