class Kazoo::Subscription

A Kazoo::Subscription describes interest in a set of topics of a Kafka cluster.

Use Kazoo::Subscription.build to instantiate a subscription. It will return one of the two known subclasses of Kazoo::Subscription: a Kazoo::StaticSubscription for a static list of topics, or a Kazoo::PatternSUbscription for a dynamic list based on a regular expression that serves as a white list or black list.

Attributes

timestamp[R]
version[R]

Public Class Methods

build(subscription, pattern: :white_list, timestamp: Time.now) click to toggle source

Instantiates a Kazoo::Subscription based on the subscription argument.

  • If the subscription argument is the name of a topic, a Kazoo::Topic, or an array of those, it will create a static subscription for the provided topic.

  • If the subscription argument is a regular expression, it will create a pattern subscription. The `pattern` argument will determine whether it is a white_list (default), or black_list.

  • If the subscription argument is a Kazoo::Subscription, it will return the argument itself.

# File lib/kazoo/subscription.rb, line 24
def self.build(subscription, pattern: :white_list, timestamp: Time.now)
  case subscription
  when Kazoo::Subscription
    subscription
  when String, Symbol, Kazoo::Topic, Array
    topic_names = Array(subscription).map { |t| topic_name(t) }
    Kazoo::StaticSubscription.new(topic_names, timestamp: timestamp)
  when Regexp
    Kazoo::PatternSubscription.new(subscription, pattern: pattern, timestamp: timestamp)
  else
    raise ArgumentError, "Don't know how to create a subscription from #{subscription.inspect}"
  end
end
everything() click to toggle source

Instantiates a whitelist subscription that matches every topic.

# File lib/kazoo/subscription.rb, line 13
def self.everything
  build(/.*/)
end
from_json(json_payload) click to toggle source

Instantiates a Kazoo::Subscription based on a JSON payload as it is stored in Zookeeper.

This method will raise Kazoo::InvalidSubscription if the JSON payload cannot be parsed. Only version 1 payloads are supported.

# File lib/kazoo/subscription.rb, line 42
def self.from_json(json_payload)
  json = JSON.parse(json_payload)
  version, timestamp = json.fetch('version'), json.fetch('timestamp')
  raise Kazoo::InvalidSubscription, "Only version 1 subscriptions are supported, found version #{version}!" unless version == 1

  time = Time.at(BigDecimal.new(timestamp) / BigDecimal.new(1000))

  pattern, subscription = json.fetch('pattern'), json.fetch('subscription')
  raise Kazoo::InvalidSubscription, "Only subscriptions with a single stream are supported" unless subscription.values.all? { |streams| streams == 1 }

  case pattern
  when 'static'
    topic_names = subscription.keys
    Kazoo::StaticSubscription.new(topic_names, version: version, timestamp: time)

  when 'white_list', 'black_list'
    raise Kazoo::InvalidSubscription, "Only pattern subscriptions with a single expression are supported" unless subscription.keys.length == 1
    regexp = Regexp.new(subscription.keys.first.tr(',', '|'))
    Kazoo::PatternSubscription.new(regexp, pattern: pattern.to_sym, version: version, timestamp: time)

  else
    raise Kazoo::InvalidSubscription, "Unrecognized subscription pattern #{pattern.inspect}"
  end

rescue JSON::ParserError, KeyError => e
  raise Kazoo::InvalidSubscription.new(e.message)
end
new(timestamp: Time.now, version: 1) click to toggle source

Subclasses should call super(**kwargs) in their initializer.

# File lib/kazoo/subscription.rb, line 120
def initialize(timestamp: Time.now, version: 1)
  @timestamp, @version = timestamp, version
end
topic_name(topic) click to toggle source

Returns a topic name based on various inputs. Helper method used by Kazoo::Subscription.build

# File lib/kazoo/subscription.rb, line 73
def self.topic_name(topic)
  case topic
    when String, Symbol; topic.to_s
    when Kazoo::Topic;   topic.name
    else raise ArgumentError, "Cannot get topic name from #{topic.inspect}"
  end
end

Public Instance Methods

==(other)
Alias for: eql?
eql?(other) click to toggle source
# File lib/kazoo/subscription.rb, line 103
def eql?(other)
  other.kind_of?(Kazoo::Subscription) && other.pattern == pattern && other.subscription == subscription
end
Also aliased as: ==
has_topic?(topic) click to toggle source

has_topic? should return true if a given Kazoo::Topic is part of this subscription.

# File lib/kazoo/subscription.rb, line 94
def has_topic?(topic)
  raise NotImplementedError
end
hash() click to toggle source
# File lib/kazoo/subscription.rb, line 109
def hash
  [pattern, subscription].hash
end
inspect() click to toggle source
# File lib/kazoo/subscription.rb, line 113
def inspect
  "#<#{self.class.name} pattern=#{pattern} subscription=#{subscription.inspect}>"
end
partitions(cluster) click to toggle source

Returns an array of all Kazoo::Partition instances in the given Kafka cluster that are matched by this subscription.

# File lib/kazoo/subscription.rb, line 89
def partitions(cluster)
  topics(cluster).flat_map { |topic| topic.partitions }
end
to_json(options = {}) click to toggle source

Returns the JSON representation of this subscription that can be stored in Zookeeper.

# File lib/kazoo/subscription.rb, line 99
def to_json(options = {})
  JSON.dump(as_json(options))
end
topics(cluster) click to toggle source

Returns an array of all Kazoo::Topic instances in the given Kafka cluster that are matched by this subscription.

# File lib/kazoo/subscription.rb, line 83
def topics(cluster)
  cluster.topics.values.select { |topic| has_topic?(topic) }
end

Protected Instance Methods

pattern() click to toggle source

Should return the name of the pattern, i.e. static, white_list, or black_list

# File lib/kazoo/subscription.rb, line 131
def pattern
  raise NotImplementedError
end
subscription() click to toggle source

Subclasses should return a hash that can be converted to JSON to represent the subscription in Zookeeper.

# File lib/kazoo/subscription.rb, line 126
def subscription
  raise NotImplementedError
end

Private Instance Methods

as_json(options = {}) click to toggle source
# File lib/kazoo/subscription.rb, line 137
def as_json(options = {})
  {
    version:      version,
    pattern:      pattern,
    timestamp:    msec_timestamp,
    subscription: subscription,
  }
end
msec_timestamp() click to toggle source
# File lib/kazoo/subscription.rb, line 146
def msec_timestamp
  (timestamp.to_i * 1000) + (timestamp.nsec / 1_000_000)
end