class Kazoo::Subscription
A Kazoo::Subscription
describes interest in a set of topics of a Kafka cluster.
Use Kazoo::Subscription.build
to instantiate a subscription. It will return one of the two known subclasses of Kazoo::Subscription: a Kazoo::StaticSubscription
for a static list of topics, or a Kazoo::PatternSUbscription for a dynamic list based on a regular expression that serves as a white list or black list.
Attributes
Public Class Methods
Instantiates a Kazoo::Subscription
based on the subscription argument.
-
If the subscription argument is the name of a topic, a
Kazoo::Topic
, or an array of those, it will create a static subscription for the provided topic. -
If the subscription argument is a regular expression, it will create a pattern subscription. The `pattern` argument will determine whether it is a white_list (default), or black_list.
-
If the subscription argument is a
Kazoo::Subscription
, it will return the argument itself.
# File lib/kazoo/subscription.rb, line 24 def self.build(subscription, pattern: :white_list, timestamp: Time.now) case subscription when Kazoo::Subscription subscription when String, Symbol, Kazoo::Topic, Array topic_names = Array(subscription).map { |t| topic_name(t) } Kazoo::StaticSubscription.new(topic_names, timestamp: timestamp) when Regexp Kazoo::PatternSubscription.new(subscription, pattern: pattern, timestamp: timestamp) else raise ArgumentError, "Don't know how to create a subscription from #{subscription.inspect}" end end
Instantiates a whitelist subscription that matches every topic.
# File lib/kazoo/subscription.rb, line 13 def self.everything build(/.*/) end
Instantiates a Kazoo::Subscription
based on a JSON payload as it is stored in Zookeeper.
This method will raise Kazoo::InvalidSubscription if the JSON payload cannot be parsed. Only version 1 payloads are supported.
# File lib/kazoo/subscription.rb, line 42 def self.from_json(json_payload) json = JSON.parse(json_payload) version, timestamp = json.fetch('version'), json.fetch('timestamp') raise Kazoo::InvalidSubscription, "Only version 1 subscriptions are supported, found version #{version}!" unless version == 1 time = Time.at(BigDecimal.new(timestamp) / BigDecimal.new(1000)) pattern, subscription = json.fetch('pattern'), json.fetch('subscription') raise Kazoo::InvalidSubscription, "Only subscriptions with a single stream are supported" unless subscription.values.all? { |streams| streams == 1 } case pattern when 'static' topic_names = subscription.keys Kazoo::StaticSubscription.new(topic_names, version: version, timestamp: time) when 'white_list', 'black_list' raise Kazoo::InvalidSubscription, "Only pattern subscriptions with a single expression are supported" unless subscription.keys.length == 1 regexp = Regexp.new(subscription.keys.first.tr(',', '|')) Kazoo::PatternSubscription.new(regexp, pattern: pattern.to_sym, version: version, timestamp: time) else raise Kazoo::InvalidSubscription, "Unrecognized subscription pattern #{pattern.inspect}" end rescue JSON::ParserError, KeyError => e raise Kazoo::InvalidSubscription.new(e.message) end
Subclasses should call super(**kwargs) in their initializer.
# File lib/kazoo/subscription.rb, line 120 def initialize(timestamp: Time.now, version: 1) @timestamp, @version = timestamp, version end
Returns a topic name based on various inputs. Helper method used by Kazoo::Subscription.build
# File lib/kazoo/subscription.rb, line 73 def self.topic_name(topic) case topic when String, Symbol; topic.to_s when Kazoo::Topic; topic.name else raise ArgumentError, "Cannot get topic name from #{topic.inspect}" end end
Public Instance Methods
# File lib/kazoo/subscription.rb, line 103 def eql?(other) other.kind_of?(Kazoo::Subscription) && other.pattern == pattern && other.subscription == subscription end
has_topic? should return true if a given Kazoo::Topic
is part of this subscription.
# File lib/kazoo/subscription.rb, line 94 def has_topic?(topic) raise NotImplementedError end
# File lib/kazoo/subscription.rb, line 109 def hash [pattern, subscription].hash end
# File lib/kazoo/subscription.rb, line 113 def inspect "#<#{self.class.name} pattern=#{pattern} subscription=#{subscription.inspect}>" end
Returns an array of all Kazoo::Partition
instances in the given Kafka cluster that are matched by this subscription.
# File lib/kazoo/subscription.rb, line 89 def partitions(cluster) topics(cluster).flat_map { |topic| topic.partitions } end
Returns the JSON representation of this subscription that can be stored in Zookeeper.
# File lib/kazoo/subscription.rb, line 99 def to_json(options = {}) JSON.dump(as_json(options)) end
Returns an array of all Kazoo::Topic
instances in the given Kafka cluster that are matched by this subscription.
# File lib/kazoo/subscription.rb, line 83 def topics(cluster) cluster.topics.values.select { |topic| has_topic?(topic) } end
Protected Instance Methods
Should return the name of the pattern, i.e. static, white_list, or black_list
# File lib/kazoo/subscription.rb, line 131 def pattern raise NotImplementedError end
Subclasses should return a hash that can be converted to JSON to represent the subscription in Zookeeper.
# File lib/kazoo/subscription.rb, line 126 def subscription raise NotImplementedError end
Private Instance Methods
# File lib/kazoo/subscription.rb, line 137 def as_json(options = {}) { version: version, pattern: pattern, timestamp: msec_timestamp, subscription: subscription, } end
# File lib/kazoo/subscription.rb, line 146 def msec_timestamp (timestamp.to_i * 1000) + (timestamp.nsec / 1_000_000) end