class Protobuf::Rpc::ServiceDirectory

Constants

DEFAULT_ADDRESS
DEFAULT_PORT
DEFAULT_TIMEOUT

Attributes

address[W]
port[W]

Public Class Methods

address() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 67
def self.address
  @address ||= DEFAULT_ADDRESS
end
new() click to toggle source

Instance Methods

# File lib/protobuf/rpc/service_directory.rb, line 87
def initialize
  reset
end
port() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 71
def self.port
  @port ||= DEFAULT_PORT
end
start() { |self| ... } click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 75
def self.start
  yield(self) if block_given?
  instance.start
end
stop() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 80
def self.stop
  instance.stop
end

Public Instance Methods

all_listings_for(service) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 91
def all_listings_for(service)
  if running? && @listings_by_service.key?(service.to_s)
    start_listener_thread if listener_dead?
    @listings_by_service[service.to_s].entries.shuffle
  else
    []
  end
end
each_listing(&block) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 100
def each_listing(&block)
  start_listener_thread if listener_dead?
  @listings_by_uuid.each_value(&block)
end
listener_dead?() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 112
def listener_dead?
  @thread.nil? || !@thread.alive?
end
lookup(service) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 105
def lookup(service)
  return unless running?
  start_listener_thread if listener_dead?
  return unless @listings_by_service.key?(service.to_s)
  @listings_by_service[service.to_s].entries.sample
end
restart() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 116
def restart
  stop
  start
end
running?() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 121
def running?
  !!@running
end
start() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 125
def start
  unless running?
    init_socket
    logger.info { sign_message("listening to udp://#{self.class.address}:#{self.class.port}") }
    @running = true
  end

  start_listener_thread if listener_dead?
  self
end
start_listener_thread() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 136
def start_listener_thread
  return if @thread.try(:alive?)
  @thread = Thread.new { send(:run) }
end
stop() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 141
def stop
  logger.info { sign_message("Stopping directory") }

  @running = false
  @thread.try(:kill).try(:join)
  @socket.try(:close)

  reset
end

Private Instance Methods

add_or_update_listing(uuid, server) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 153
def add_or_update_listing(uuid, server)
  listing = @listings_by_uuid[uuid]

  if listing
    action = :updated
    listing.update(server)
  else
    action = :added
    listing = Listing.new(server)
    @listings_by_uuid[uuid] = listing
  end

  listing.services.each do |service|
    @listings_by_service[service] << listing
  end

  trigger(action, listing)
  logger.debug { sign_message("#{action} server: #{server.inspect}") }
end
init_socket() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 173
def init_socket
  @socket = UDPSocket.new
  @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true)

  if defined?(::Socket::SO_REUSEPORT)
    @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true)
  end

  @socket.bind(self.class.address, self.class.port.to_i)
end
process_beacon(beacon) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 184
def process_beacon(beacon)
  server = beacon.server
  uuid = server.try(:uuid)

  if server && uuid
    case beacon.beacon_type
    when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT
      add_or_update_listing(uuid, server)
    when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE
      remove_listing(uuid)
    end
  else
    logger.info { sign_message("Ignoring incomplete beacon: #{beacon.inspect}") }
  end
end
read_beacon() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 200
def read_beacon
  data, addr = @socket.recvfrom(2048)

  beacon = ::Protobuf::Rpc::DynamicDiscovery::Beacon.decode(data)

  # Favor the address captured by the socket
  beacon.try(:server).try(:address=, addr[3])

  beacon
end
remove_expired_listings() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 211
def remove_expired_listings
  logger.debug { sign_message("Removing expired listings") }
  @listings_by_uuid.each do |uuid, listing|
    remove_listing(uuid) if listing.expired?
  end
end
remove_listing(uuid) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 218
def remove_listing(uuid)
  listing = @listings_by_uuid[uuid] || return

  logger.debug { sign_message("Removing listing: #{listing.inspect}") }

  @listings_by_service.each_value do |listings|
    listings.delete(listing)
  end

  trigger(:removed, @listings_by_uuid.delete(uuid))
end
reset() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 230
def reset
  @thread = nil
  @socket = nil
  @listings_by_uuid = {}
  @listings_by_service = Hash.new { |h, k| h[k] = Set.new }
end
run() click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 237
def run
  sweep_interval = 5 # sweep expired listings every 5 seconds
  next_sweep = Time.now.to_i + sweep_interval

  loop do
    timeout = [next_sweep - Time.now.to_i, 0.1].max
    readable = IO.select([@socket], nil, nil, timeout)
    process_beacon(read_beacon) if readable

    if Time.now.to_i >= next_sweep
      remove_expired_listings
      next_sweep = Time.now.to_i + sweep_interval
    end
  end
rescue => e
  logger.debug { sign_message("ERROR: (#{e.class}) #{e.message}\n#{e.backtrace.join("\n")}") }
  retry
end
trigger(action, listing) click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 256
def trigger(action, listing)
  ::ActiveSupport::Notifications.instrument("directory.listing.#{action}", :listing => listing)
end