class DruidDB::ZK

Attributes

client[R]
config[R]
listeners[R]
registry[RW]

Public Class Methods

new(config) click to toggle source
# File lib/druiddb/zk.rb, line 6
def initialize(config)
  @client = ::ZK.new(config.zookeeper)
  @config = config
  @listeners = []
  @registry = {}
  register
end

Public Instance Methods

register_listener(object, method) click to toggle source
# File lib/druiddb/zk.rb, line 14
def register_listener(object, method)
  listeners << ->(*args) { object.send(method, *args) }
end

Private Instance Methods

announce(service) click to toggle source
# File lib/druiddb/zk.rb, line 20
def announce(service)
  listeners.each { |listener| listener.call(service) }
end
register() click to toggle source
# File lib/druiddb/zk.rb, line 24
def register
  register_service("#{config.discovery_path}/druid:broker")
  register_service("#{config.discovery_path}/druid:coordinator")
  register_service("#{config.discovery_path}/druid:overlord")
  register_service(config.kafka_broker_path.to_s)
end
register_service(service) click to toggle source
# File lib/druiddb/zk.rb, line 31
def register_service(service)
  subscribe_to_service(service)
  renew_service_instances(service)
end
renew_service_instances(service) click to toggle source
# File lib/druiddb/zk.rb, line 36
def renew_service_instances(service)
  instances = client.children(service, watch: true)

  registry[service] = []
  instances.each do |instance|
    data = JSON.parse(client.get("#{service}/#{instance}").first)
    host = data['address'] || data['host']
    port = data['port']
    registry[service] << { host: host, port: port }
  end
end
subscribe_to_service(service) click to toggle source
# File lib/druiddb/zk.rb, line 48
def subscribe_to_service(service)
  client.register(service) do |event|
    renew_service_instances(event.path)
    announce(event.path)
  end
end