class Synapse::ZookeeperWatcher
Constants
- NUMBERS_RE
Public Instance Methods
ping?()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 24 def ping? @zk && @zk.connected? end
start()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 9 def start @zk_hosts = @discovery['hosts'].shuffle.join(',') @watcher = nil @zk = nil log.info "synapse: starting ZK watcher #{@name} @ hosts: #{@zk_hosts}, path: #{@discovery['path']}" zk_connect end
stop()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 19 def stop log.warn "synapse: zookeeper watcher exiting" zk_cleanup end
Private Instance Methods
create(path)
click to toggle source
helper method that ensures that the discovery path exists
# File lib/synapse/service_watcher/zookeeper.rb, line 40 def create(path) log.debug "synapse: creating ZK path: #{path}" # recurse if the parent node does not exist create File.dirname(path) unless @zk.exists? File.dirname(path) @zk.create(path, ignore: :node_exists) end
deserialize_service_instance(data)
click to toggle source
decode the data at a zookeeper endpoint
# File lib/synapse/service_watcher/zookeeper.rb, line 142 def deserialize_service_instance(data) log.debug "synapse: deserializing process data" decoded = JSON.parse(data) host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') name = decoded['name'] || nil return host, port, name end
discover()
click to toggle source
find the current backends at the discovery path; sets @backends
# File lib/synapse/service_watcher/zookeeper.rb, line 49 def discover log.info "synapse: discovering backends for service #{@name}" new_backends = [] @zk.children(@discovery['path'], :watch => true).each do |id| node = @zk.get("#{@discovery['path']}/#{id}") begin host, port, name = deserialize_service_instance(node.first) rescue StandardError => e log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" else server_port = @server_port_override ? @server_port_override : port # find the numberic id in the node name; used for leader elections if enabled numeric_id = id.split('_').last numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} end end if new_backends.empty? if @default_servers.empty? log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" else log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" @backends = @default_servers end else log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" set_backends(new_backends) end end
validate_discovery_opts()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 30 def validate_discovery_opts raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ unless @discovery['method'] == 'zookeeper' raise ArgumentError, "missing or invalid zookeeper host for service #{@name}" \ unless @discovery['hosts'] raise ArgumentError, "invalid zookeeper path for service #{@name}" \ unless @discovery['path'] end
watch()
click to toggle source
sets up zookeeper callbacks if the data at the discovery path changes
# File lib/synapse/service_watcher/zookeeper.rb, line 86 def watch return if @zk.nil? @watcher.unsubscribe unless @watcher.nil? @watcher = @zk.register(@discovery['path'], &watcher_callback) # Verify that we actually set up the watcher. unless @zk.exists?(@discovery['path'], :watch => true) log.error "synapse: zookeeper watcher path #{@discovery['path']} does not exist!" raise RuntimeError.new('could not set a ZK watch on a node that should exist') end end
watcher_callback()
click to toggle source
handles the event that a watched path has changed in zookeeper
# File lib/synapse/service_watcher/zookeeper.rb, line 100 def watcher_callback @callback ||= Proc.new do |event| # Set new watcher watch # Rediscover discover # send a message to calling class to reconfigure reconfigure! end end
zk_cleanup()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 111 def zk_cleanup log.info "synapse: zookeeper watcher cleaning up" @watcher.unsubscribe unless @watcher.nil? @watcher = nil @zk.close! unless @zk.nil? @zk = nil log.info "synapse: zookeeper watcher cleaned up successfully" end
zk_connect()
click to toggle source
# File lib/synapse/service_watcher/zookeeper.rb, line 123 def zk_connect log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}" @zk = ZK.new(@zk_hosts) # handle session expiry -- by cleaning up zk, this will make `ping?` # fail and so synapse will exit @zk.on_expired_session do log.warn "synapse: zookeeper watcher ZK session expired!" zk_cleanup end # the path must exist, otherwise watch callbacks will not work create(@discovery['path']) # call the callback to bootstrap the process watcher_callback.call end