class Protobuf::Rpc::Zmq::Server

Constants

DEFAULT_OPTIONS

Attributes

options[RW]
workers[RW]
zmq_context[R]

Public Class Methods

new(options) click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 24
def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @workers = []

  init_zmq_context
  init_beacon_socket if broadcast_beacons?
  init_shutdown_pipe
rescue
  teardown
  raise
end

Public Instance Methods

add_worker() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 36
def add_worker
  @total_workers = total_workers + 1
end
all_workers_busy?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 40
def all_workers_busy?
  workers.all? { |thread| !!thread[:busy] }
end
backend_ip()
Alias for: frontend_ip
backend_port() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 44
def backend_port
  options[:worker_port] || frontend_port + 1
end
backend_uri() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 48
def backend_uri
  if inproc?
    "inproc://#{backend_ip}:#{backend_port}"
  else
    "tcp://#{backend_ip}:#{backend_port}"
  end
end
beacon_interval() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 56
def beacon_interval
  [options[:beacon_interval].to_i, 1].max
end
beacon_ip() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 60
def beacon_ip
  "255.255.255.255"
end
beacon_port() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 64
def beacon_port
  @beacon_port ||= options.fetch(
    :beacon_port,
    ::Protobuf::Rpc::ServiceDirectory.port
  ).to_i
end
beacon_uri() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 71
def beacon_uri
  "udp://#{beacon_ip}:#{beacon_port}"
end
broadcast_beacons?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 75
def broadcast_beacons?
  !brokerless? && options[:broadcast_beacons]
end
broadcast_busy?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 79
def broadcast_busy?
  broadcast_beacons? && options[:broadcast_busy]
end
broadcast_flatline() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 83
def broadcast_flatline
  flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE,
    :server => to_proto
  )

  @beacon_socket.send(flatline.encode, 0)
end
broadcast_heartbeat() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 92
def broadcast_heartbeat
  @last_beacon = Time.now.to_i

  heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT,
    :server => to_proto
  )

  @beacon_socket.send(heartbeat.encode, 0)

  logger.debug { sign_message("sent heartbeat to #{beacon_uri}") }
end
broadcast_heartbeat?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 105
def broadcast_heartbeat?
  Time.now.to_i >= next_beacon && broadcast_beacons?
end
brokerless?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 109
def brokerless?
  !!options[:workers_only]
end
busy_worker_count() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 113
def busy_worker_count
  workers.count { |thread| !!thread[:busy] }
end
frontend_ip() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 117
def frontend_ip
  @frontend_ip ||= resolve_ip(options[:host])
end
Also aliased as: backend_ip
frontend_port() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 122
def frontend_port
  options[:port]
end
frontend_uri() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 126
def frontend_uri
  "tcp://#{frontend_ip}:#{frontend_port}"
end
inproc?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 130
def inproc?
  !!options[:zmq_inproc]
end
maintenance_timeout() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 134
def maintenance_timeout
  next_maintenance - Time.now.to_i
end
minimum_timeout() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 145
def minimum_timeout
  0.1
end
next_beacon() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 149
def next_beacon
  if @last_beacon.nil?
    0
  else
    @last_beacon + beacon_interval
  end
end
next_maintenance() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 138
def next_maintenance
  cycles = [next_reaping]
  cycles << next_beacon if broadcast_beacons?

  cycles.min
end
next_reaping() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 157
def next_reaping
  if @last_reaping.nil?
    0
  else
    @last_reaping + reaping_interval
  end
end
reap_dead_workers() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 165
def reap_dead_workers
  @last_reaping = Time.now.to_i

  @workers.keep_if do |worker|
    worker.alive? || worker.join && false
  end
end
reap_dead_workers?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 173
def reap_dead_workers?
  Time.now.to_i >= next_reaping
end
reaping_interval() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 177
def reaping_interval
  5
end
run() { || ... } click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 181
def run
  @running = true
  yield if block_given? # runs on startup
  wait_for_shutdown_signal
  broadcast_flatline if broadcast_beacons?
  Thread.pass until reap_dead_workers.empty?
  @broker_thread.join unless brokerless?
ensure
  @running = false
  teardown
end
running?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 193
def running?
  !!@running
end
start_missing_workers() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 197
def start_missing_workers
  missing_workers = total_workers - @workers.size

  if missing_workers > 0
    missing_workers.times { start_worker }
    logger.debug { sign_message("#{total_workers} workers started") }
  end
end
stop() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 206
def stop
  @running = false
  @shutdown_w.write('.')
end
teardown() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 211
def teardown
  @shutdown_r.try(:close)
  @shutdown_w.try(:close)
  @beacon_socket.try(:close)
  @zmq_context.try(:terminate)
  @last_reaping = @last_beacon = @timeout = nil
end
timeout() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 219
def timeout
  @timeout =
    if @timeout.nil?
      0
    else
      [minimum_timeout, maintenance_timeout].max
    end
end
to_proto() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 232
def to_proto
  @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new(
    :uuid => uuid,
    :address => frontend_ip,
    :port => frontend_port.to_s,
    :ttl => (beacon_interval * 1.5).ceil,
    :services => ::Protobuf::Rpc::Service.implemented_services
  )
end
total_workers() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 228
def total_workers
  @total_workers ||= [@options[:threads].to_i, 1].max
end
uuid() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 242
def uuid
  @uuid ||= SecureRandom.uuid
end
wait_for_shutdown_signal() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 246
def wait_for_shutdown_signal
  loop do
    break if IO.select([@shutdown_r], nil, nil, timeout)

    start_broker unless brokerless?
    reap_dead_workers if reap_dead_workers?
    start_missing_workers

    next unless broadcast_heartbeat?

    if broadcast_busy? && all_workers_busy?
      broadcast_flatline
    else
      broadcast_heartbeat
    end
  end
end

Private Instance Methods

init_beacon_socket() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 266
def init_beacon_socket
  @beacon_socket = UDPSocket.new
  @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_BROADCAST, true)
  @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true)

  if defined?(::Socket::SO_REUSEPORT)
    @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true)
  end

  @beacon_socket.bind(frontend_ip, beacon_port)
  @beacon_socket.connect(beacon_ip, beacon_port)
end
init_shutdown_pipe() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 279
def init_shutdown_pipe
  @shutdown_r, @shutdown_w = IO.pipe
end
init_zmq_context() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 283
def init_zmq_context
  @zmq_context = ZMQ::Context.new
end
start_broker() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 287
def start_broker
  return if @broker && @broker.running? && @broker_thread.alive?
  if @broker && !@broker.running?
    broadcast_flatline if broadcast_busy?
    @broker_thread.join if @broker_thread
    init_zmq_context # need a new context to restart the broker
  end

  @broker = ::Protobuf::Rpc::Zmq::Broker.new(self)
  @broker_thread = Thread.new(@broker) do |broker|
    begin
      broker.run
    rescue => e
      message = "Broker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}"
      $stderr.puts(message)
      logger.error { message }
    end
  end
end
start_worker() click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 307
def start_worker
  @workers << Thread.new(self, @broker) do |server, broker|
    begin
      ::Protobuf::Rpc::Zmq::Worker.new(server, broker).run
    rescue => e
      message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}"
      $stderr.puts(message)
      logger.error { message }
    end
  end
end