class DruidDB::ZK
Attributes
client[R]
config[R]
listeners[R]
registry[RW]
Public Class Methods
new(config)
click to toggle source
# File lib/druiddb/zk.rb, line 6 def initialize(config) @client = ::ZK.new(config.zookeeper) @config = config @listeners = [] @registry = {} register end
Public Instance Methods
register_listener(object, method)
click to toggle source
# File lib/druiddb/zk.rb, line 14 def register_listener(object, method) listeners << ->(*args) { object.send(method, *args) } end
Private Instance Methods
announce(service)
click to toggle source
# File lib/druiddb/zk.rb, line 20 def announce(service) listeners.each { |listener| listener.call(service) } end
register()
click to toggle source
# File lib/druiddb/zk.rb, line 24 def register register_service("#{config.discovery_path}/druid:broker") register_service("#{config.discovery_path}/druid:coordinator") register_service("#{config.discovery_path}/druid:overlord") register_service(config.kafka_broker_path.to_s) end
register_service(service)
click to toggle source
# File lib/druiddb/zk.rb, line 31 def register_service(service) subscribe_to_service(service) renew_service_instances(service) end
renew_service_instances(service)
click to toggle source
# File lib/druiddb/zk.rb, line 36 def renew_service_instances(service) instances = client.children(service, watch: true) registry[service] = [] instances.each do |instance| data = JSON.parse(client.get("#{service}/#{instance}").first) host = data['address'] || data['host'] port = data['port'] registry[service] << { host: host, port: port } end end
subscribe_to_service(service)
click to toggle source
# File lib/druiddb/zk.rb, line 48 def subscribe_to_service(service) client.register(service) do |event| renew_service_instances(event.path) announce(event.path) end end