class Protobuf::Rpc::Zmq::Worker
Public Class Methods
new(server, broker)
click to toggle source
Constructor
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 15 def initialize(server, broker) @server = server @broker = broker init_zmq_context init_backend_socket rescue teardown raise end
Public Instance Methods
process_request()
click to toggle source
Instance Methods
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 29 def process_request client_address, _, data = read_from_backend return unless data gc_pause do encoded_response = handle_request(data) write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response]) end end
run()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 39 def run poller = ::ZMQ::Poller.new poller.register_readable(@backend_socket) poller.register_readable(@shutdown_socket) # Send request to broker telling it we are ready write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE]) loop do rc = poller.poll(500) if rc == 0 && !running? break # The server was shutdown and no requests are pending elsif rc == -1 break # Something went wrong elsif rc > 0 ::Thread.current[:busy] = true process_request ::Thread.current[:busy] = false end end ensure teardown end
running?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 64 def running? @broker.running? && @server.running? end
Private Instance Methods
init_backend_socket()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 78 def init_backend_socket @backend_socket = @zmq_context.socket(ZMQ::REQ) zmq_error_check(@backend_socket.connect(@server.backend_uri)) end
init_zmq_context()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 70 def init_zmq_context if inproc? @zmq_context = @server.zmq_context else @zmq_context = ZMQ::Context.new end end
inproc?()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 83 def inproc? !!@server.try(:inproc?) end
read_from_backend()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 87 def read_from_backend frames = [] zmq_error_check(@backend_socket.recv_strings(frames)) frames end
teardown()
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 93 def teardown @backend_socket.try(:close) @zmq_context.try(:terminate) unless inproc? end
write_to_backend(frames)
click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 98 def write_to_backend(frames) zmq_error_check(@backend_socket.send_strings(frames)) end