class Protobuf::Rpc::Zmq::Broker
Attributes
local_queue[R]
Public Class Methods
new(server)
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 11 def initialize(server) @server = server init_zmq_context init_local_queue init_backend_socket init_frontend_socket init_poller rescue teardown raise end
Public Instance Methods
run()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 24 def run @idle_workers = [] @running = true loop do process_local_queue rc = @poller.poll(broker_polling_milliseconds) # The server was shutdown and no requests are pending break if rc == 0 && !running? && @server.workers.empty? # Something went wrong break if rc == -1 check_and_process_backend process_local_queue # Fair ordering so queued requests get in before new requests check_and_process_frontend end ensure teardown @running = false end
running?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 46 def running? @running && @server.running? end
Private Instance Methods
backend_poll_weight()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 52 def backend_poll_weight @backend_poll_weight ||= [ENV["PB_ZMQ_SERVER_BACKEND_POLL_WEIGHT"].to_i, 1].max end
broker_polling_milliseconds()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 56 def broker_polling_milliseconds @broker_polling_milliseconds ||= [ENV["PB_ZMQ_BROKER_POLLING_MILLISECONDS"].to_i, 500].max end
check_and_process_backend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 60 def check_and_process_backend readables_include_backend = @poller.readables.include?(@backend_socket) message_count_read_from_backend = 0 while readables_include_backend && message_count_read_from_backend < backend_poll_weight message_count_read_from_backend += 1 process_backend @poller.poll_nonblock readables_include_backend = @poller.readables.include?(@backend_socket) end end
check_and_process_frontend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 72 def check_and_process_frontend readables_include_frontend = @poller.readables.include?(@frontend_socket) message_count_read_from_frontend = 0 while readables_include_frontend && message_count_read_from_frontend < frontend_poll_weight message_count_read_from_frontend += 1 process_frontend break unless local_queue_available? # no need to read frontend just to throw away messages, will prioritize backend when full @poller.poll_nonblock readables_include_frontend = @poller.readables.include?(@frontend_socket) end end
frontend_poll_weight()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 85 def frontend_poll_weight @frontend_poll_weight ||= [ENV["PB_ZMQ_SERVER_FRONTEND_POLL_WEIGHT"].to_i, 1].max end
init_backend_socket()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 89 def init_backend_socket @backend_socket = @zmq_context.socket(ZMQ::ROUTER) zmq_error_check(@backend_socket.bind(@server.backend_uri)) end
init_frontend_socket()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 94 def init_frontend_socket @frontend_socket = @zmq_context.socket(ZMQ::ROUTER) zmq_error_check(@frontend_socket.bind(@server.frontend_uri)) end
init_local_queue()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 99 def init_local_queue @local_queue = [] end
init_poller()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 103 def init_poller @poller = ZMQ::Poller.new @poller.register_readable(@frontend_socket) @poller.register_readable(@backend_socket) end
init_zmq_context()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 109 def init_zmq_context if inproc? @zmq_context = @server.zmq_context else @zmq_context = ZMQ::Context.new end end
inproc?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 117 def inproc? !!@server.try(:inproc?) end
local_queue_available?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 121 def local_queue_available? local_queue.size < local_queue_max_size && running? end
local_queue_max_size()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 125 def local_queue_max_size @local_queue_max_size ||= [ENV["PB_ZMQ_SERVER_QUEUE_MAX_SIZE"].to_i, 5].max end
process_backend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 129 def process_backend worker, _ignore, *frames = read_from_backend @idle_workers << worker unless frames == [::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE] write_to_frontend(frames) end end
process_frontend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 139 def process_frontend address, _, message, *frames = read_from_frontend if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE if local_queue_available? write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE]) else write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE]) end else if @idle_workers.empty? local_queue << [address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message].concat(frames) else write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message]).concat(frames)) end end end
process_local_queue()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 157 def process_local_queue return if local_queue.empty? return if @idle_workers.empty? write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat(local_queue.shift)) process_local_queue end
read_from_backend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 165 def read_from_backend frames = [] zmq_error_check(@backend_socket.recv_strings(frames)) frames end
read_from_frontend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 171 def read_from_frontend frames = [] zmq_error_check(@frontend_socket.recv_strings(frames)) frames end
teardown()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 177 def teardown @frontend_socket.try(:close) @backend_socket.try(:close) @zmq_context.try(:terminate) unless inproc? end
write_to_backend(frames)
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 183 def write_to_backend(frames) zmq_error_check(@backend_socket.send_strings(frames)) end
write_to_frontend(frames)
click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 187 def write_to_frontend(frames) zmq_error_check(@frontend_socket.send_strings(frames)) end