class Kazoo::Consumergroup::Instance
Attributes
group[R]
id[R]
subscription[R]
Public Class Methods
generate_id()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 332 def self.generate_id "#{Socket.gethostname}:#{SecureRandom.uuid}" end
new(group, id: nil, subscription: nil)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 338 def initialize(group, id: nil, subscription: nil) @group = group @id = id || self.class.generate_id @subscription = Kazoo::Subscription.build(subscription) unless subscription.nil? end
Public Instance Methods
claim_partition(partition)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 393 def claim_partition(partition) result = cluster.zk.create( path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}", ephemeral: true, data: id, ) case result.fetch(:rc) when Zookeeper::Constants::ZOK return true when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!" else raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end
created_at()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 376 def created_at result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end
deregister()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 384 def deregister result = cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to deregister instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end return self end
eql?(other)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 425 def eql?(other) other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id end
Also aliased as: ==
hash()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 421 def hash [group, id].hash end
inspect()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 417 def inspect "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>" end
register(subscription_deprecated = nil)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 349 def register(subscription_deprecated = nil) # Don't provide the subscription here, but provide it when instantiating the consumer instance. @subscription = Kazoo::Subscription.build(subscription_deprecated) unless subscription_deprecated.nil? result = cluster.zk.create( path: "/consumers/#{group.name}/ids/#{id}", ephemeral: true, data: subscription.to_json, ) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end subscription.topics(cluster).each do |topic| stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}") unless stat.fetch(:stat).exists? result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end end end return self end
registered?()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 344 def registered? stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") stat.fetch(:stat).exists? end
release_partition(partition)
click to toggle source
# File lib/kazoo/consumergroup.rb, line 410 def release_partition(partition) result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end
Private Instance Methods
cluster()
click to toggle source
# File lib/kazoo/consumergroup.rb, line 433 def cluster group.cluster end