class Protobuf::Rpc::Zmq::Server
Constants
- DEFAULT_OPTIONS
Attributes
options[RW]
workers[RW]
zmq_context[R]
Public Class Methods
new(options)
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 24 def initialize(options) @options = DEFAULT_OPTIONS.merge(options) @workers = [] init_zmq_context init_beacon_socket if broadcast_beacons? init_shutdown_pipe rescue teardown raise end
Public Instance Methods
add_worker()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 36 def add_worker @total_workers = total_workers + 1 end
all_workers_busy?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 40 def all_workers_busy? workers.all? { |thread| !!thread[:busy] } end
backend_port()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 44 def backend_port options[:worker_port] || frontend_port + 1 end
backend_uri()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 48 def backend_uri if inproc? "inproc://#{backend_ip}:#{backend_port}" else "tcp://#{backend_ip}:#{backend_port}" end end
beacon_interval()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 56 def beacon_interval [options[:beacon_interval].to_i, 1].max end
beacon_ip()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 60 def beacon_ip "255.255.255.255" end
beacon_port()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 64 def beacon_port @beacon_port ||= options.fetch( :beacon_port, ::Protobuf::Rpc::ServiceDirectory.port, ).to_i end
beacon_uri()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 71 def beacon_uri "udp://#{beacon_ip}:#{beacon_port}" end
broadcast_beacons?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 75 def broadcast_beacons? !brokerless? && options[:broadcast_beacons] end
broadcast_busy?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 79 def broadcast_busy? broadcast_beacons? && options[:broadcast_busy] end
broadcast_flatline()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 83 def broadcast_flatline flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE, :server => to_proto, ) @beacon_socket.send(flatline.encode, 0) end
broadcast_heartbeat()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 92 def broadcast_heartbeat @last_beacon = Time.now.to_i heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new( :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT, :server => to_proto, ) @beacon_socket.send(heartbeat.encode, 0) logger.debug { sign_message("sent heartbeat to #{beacon_uri}") } end
broadcast_heartbeat?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 105 def broadcast_heartbeat? Time.now.to_i >= next_beacon && broadcast_beacons? end
brokerless?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 109 def brokerless? !!options[:workers_only] end
busy_worker_count()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 113 def busy_worker_count workers.count { |thread| !!thread[:busy] } end
frontend_ip()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 117 def frontend_ip @frontend_ip ||= resolve_ip(options[:host]) end
Also aliased as: backend_ip
frontend_port()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 122 def frontend_port options[:port] end
frontend_uri()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 126 def frontend_uri "tcp://#{frontend_ip}:#{frontend_port}" end
inproc?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 130 def inproc? !!options[:zmq_inproc] end
maintenance_timeout()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 134 def maintenance_timeout next_maintenance - Time.now.to_i end
minimum_timeout()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 145 def minimum_timeout 0.1 end
next_beacon()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 149 def next_beacon if @last_beacon.nil? 0 else @last_beacon + beacon_interval end end
next_maintenance()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 138 def next_maintenance cycles = [next_reaping] cycles << next_beacon if broadcast_beacons? cycles.min end
next_reaping()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 157 def next_reaping if @last_reaping.nil? 0 else @last_reaping + reaping_interval end end
reap_dead_workers()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 165 def reap_dead_workers @last_reaping = Time.now.to_i @workers.keep_if do |worker| worker.alive? || worker.join && false end end
reap_dead_workers?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 173 def reap_dead_workers? Time.now.to_i >= next_reaping end
reaping_interval()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 177 def reaping_interval 5 end
run() { || ... }
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 181 def run @running = true yield if block_given? # runs on startup wait_for_shutdown_signal broadcast_flatline if broadcast_beacons? Thread.pass until reap_dead_workers.empty? @broker_thread.join unless brokerless? ensure @running = false teardown end
running?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 193 def running? !!@running end
start_missing_workers()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 197 def start_missing_workers missing_workers = total_workers - @workers.size if missing_workers > 0 missing_workers.times { start_worker } logger.debug { sign_message("#{total_workers} workers started") } end end
stop()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 206 def stop @running = false @shutdown_w.write('.') end
teardown()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 211 def teardown @shutdown_r.try(:close) @shutdown_w.try(:close) @beacon_socket.try(:close) @zmq_context.try(:terminate) @last_reaping = @last_beacon = @timeout = nil end
timeout()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 219 def timeout if @timeout.nil? @timeout = 0 else @timeout = [minimum_timeout, maintenance_timeout].max end end
to_proto()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 231 def to_proto @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new( :uuid => uuid, :address => frontend_ip, :port => frontend_port.to_s, :ttl => (beacon_interval * 1.5).ceil, :services => ::Protobuf::Rpc::Service.implemented_services, ) end
total_workers()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 227 def total_workers @total_workers ||= [@options[:threads].to_i, 1].max end
uuid()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 241 def uuid @uuid ||= SecureRandom.uuid end
wait_for_shutdown_signal()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 245 def wait_for_shutdown_signal loop do break if IO.select([@shutdown_r], nil, nil, timeout) start_broker unless brokerless? reap_dead_workers if reap_dead_workers? start_missing_workers next unless broadcast_heartbeat? if broadcast_busy? && all_workers_busy? broadcast_flatline else broadcast_heartbeat end end end
Private Instance Methods
init_beacon_socket()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 265 def init_beacon_socket @beacon_socket = UDPSocket.new @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_BROADCAST, true) @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true) if defined?(::Socket::SO_REUSEPORT) @beacon_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true) end @beacon_socket.bind(frontend_ip, beacon_port) @beacon_socket.connect(beacon_ip, beacon_port) end
init_shutdown_pipe()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 278 def init_shutdown_pipe @shutdown_r, @shutdown_w = IO.pipe end
init_zmq_context()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 282 def init_zmq_context @zmq_context = ZMQ::Context.new end
start_broker()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 286 def start_broker return if @broker && @broker.running? && !@broker_thread.stop? if @broker && !@broker.running? broadcast_flatline if broadcast_busy? @broker_thread.join if @broker_thread init_zmq_context # need a new context to restart the broker end @broker = ::Protobuf::Rpc::Zmq::Broker.new(self) @broker_thread = Thread.new(@broker) do |broker| begin broker.run rescue => e message = "Broker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}" $stderr.puts(message) logger.error { message } end end end
start_worker()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/server.rb, line 306 def start_worker @workers << Thread.new(self, @broker) do |server, broker| begin ::Protobuf::Rpc::Zmq::Worker.new(server, broker).run rescue => e message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}" $stderr.puts(message) logger.error { message } end end end