class Protobuf::Rpc::Socket::Server
Constants
- AUTO_COLLECT_TIMEOUT
Attributes
backlog[RW]
host[RW]
port[RW]
running[RW]
running?[RW]
threshold[RW]
Public Class Methods
new(options)
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 23 def initialize(options) self.running = false self.host = options.fetch(:host) self.port = options.fetch(:port) self.backlog = options.fetch(:backlog, 100) self.threshold = options.fetch(:threshold, 100) end
Public Instance Methods
cleanup?()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 39 def cleanup? # every `threshold` connections run a cleanup routine after closing the response threads.size > 0 && threads.size % threshold == 0 end
cleanup_threads()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 44 def cleanup_threads logger.debug { sign_message("Thread cleanup - #{threads.size} - start") } threads.delete_if do |hash| unless (thread = hash.fetch(:thread)).alive? thread.join working.delete(hash.fetch(:socket)) end end logger.debug { sign_message("Thread cleanup - #{threads.size} - complete") } end
log_signature()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 57 def log_signature @_log_signature ||= "[server-#{self.class.name}]" end
new_worker(socket)
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 61 def new_worker(socket) Thread.new(socket) do |sock| ::Protobuf::Rpc::Socket::Worker.new(sock) do |s| s.close end end end
run()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 69 def run logger.debug { sign_message("Run") } server = ::TCPServer.new(host, port) fail "The server was unable to start properly." if server.closed? begin server.listen(backlog) listen_fds = [server] self.running = true while running? logger.debug { sign_message("Waiting for connections") } ready_cnxns = begin IO.select(listen_fds, [], [], AUTO_COLLECT_TIMEOUT) rescue IOError nil end if ready_cnxns ready_cnxns.first.each do |client| case when !running? # no-op when client == server logger.debug { sign_message("Accepted new connection") } client, _sockaddr = server.accept listen_fds << client else unless working.include?(client) working << listen_fds.delete(client) logger.debug { sign_message("Working") } threads << { :thread => new_worker(client), :socket => client } cleanup_threads if cleanup? end end end else # Run a cleanup if select times out while waiting cleanup_threads if threads.size > 1 end end ensure server.close end end
stop()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 117 def stop self.running = false end
threads()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 31 def threads @threads ||= [] end
working()
click to toggle source
# File lib/protobuf/rpc/servers/socket/server.rb, line 35 def working @working ||= Set.new end