class Kazoo::Consumergroup::Instance

Attributes

group[R]
id[R]
subscription[R]

Public Class Methods

generate_id() click to toggle source
# File lib/kazoo/consumergroup.rb, line 332
def self.generate_id
  "#{Socket.gethostname}:#{SecureRandom.uuid}"
end
new(group, id: nil, subscription: nil) click to toggle source
# File lib/kazoo/consumergroup.rb, line 338
def initialize(group, id: nil, subscription: nil)
  @group = group
  @id = id || self.class.generate_id
  @subscription = Kazoo::Subscription.build(subscription) unless subscription.nil?
end

Public Instance Methods

==(other)
Alias for: eql?
claim_partition(partition) click to toggle source
# File lib/kazoo/consumergroup.rb, line 393
def claim_partition(partition)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}",
    ephemeral: true,
    data: id,
  )

  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!"
  else
    raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end
created_at() click to toggle source
# File lib/kazoo/consumergroup.rb, line 376
def created_at
  result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

  Time.at(result.fetch(:stat).mtime / 1000.0)
end
deregister() click to toggle source
# File lib/kazoo/consumergroup.rb, line 384
def deregister
  result = cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to deregister instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  return self
end
eql?(other) click to toggle source
# File lib/kazoo/consumergroup.rb, line 425
def eql?(other)
  other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id
end
Also aliased as: ==
hash() click to toggle source
# File lib/kazoo/consumergroup.rb, line 421
def hash
  [group, id].hash
end
inspect() click to toggle source
# File lib/kazoo/consumergroup.rb, line 417
def inspect
  "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>"
end
register(subscription_deprecated = nil) click to toggle source
# File lib/kazoo/consumergroup.rb, line 349
def register(subscription_deprecated = nil)
  # Don't provide the subscription here, but provide it when instantiating the consumer instance.
  @subscription = Kazoo::Subscription.build(subscription_deprecated) unless subscription_deprecated.nil?

  result = cluster.zk.create(
    path: "/consumers/#{group.name}/ids/#{id}",
    ephemeral: true,
    data: subscription.to_json,
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  subscription.topics(cluster).each do |topic|
    stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}")
    unless stat.fetch(:stat).exists?
      result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}")
      if result.fetch(:rc) != Zookeeper::Constants::ZOK
        raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
      end
    end
  end

  return self
end
registered?() click to toggle source
# File lib/kazoo/consumergroup.rb, line 344
def registered?
  stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  stat.fetch(:stat).exists?
end
release_partition(partition) click to toggle source
# File lib/kazoo/consumergroup.rb, line 410
def release_partition(partition)
  result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

Private Instance Methods

cluster() click to toggle source
# File lib/kazoo/consumergroup.rb, line 433
def cluster
  group.cluster
end