class RxIO::Service
Service
Class
Constants
- SELECT_TIMEOUT
Select Timeout (seconds)
Public Class Methods
Construct: Builds a Service around a given service_handler module, set to listen for incoming connections @ addr on port. @param [String] addr Address on which the service should listen @param [Fixnum] port Port on which the service should listen @param [Module] service_handler Module implementing the service logic @param [Hash] ssl_params Optional SSL Parameters ({ cert: '/path/to/cert', pkey: '/path/to/privatekey' }) - will create a secure server if not nil
# File lib/rxio/service.rb, line 35 def initialize addr, port, service_handler, ssl_params = nil # Set SSL Params if ssl_params @ssl_ctx = OpenSSL::SSL::SSLContext.new @ssl_ctx.cert = OpenSSL::X509::Certificate.new File.read(ssl_params[:cert]) @ssl_ctx.key = OpenSSL::PKey::RSA.new File.read(ssl_params[:pkey]) end # Set Address & Port @addr = addr @port = port # Set Service Handler Module @service_handler = service_handler # Create Sockets @socks = [] # Create Clients @clients = [] # Create Client Map @cmap = {} end
Public Instance Methods
Run: Executes the main service loop, taking care of I/O scheduling, client management and message handling. This method blocks until the service loop terminates.
# File lib/rxio/service.rb, line 64 def run # Update Loop begin # Create TCP Socket Server @serv = TCPServer.new @addr, @port @serv = OpenSSL::SSL::SSLServer.new @serv, @ssl_ctx if defined?(@ssl_ctx) && @ssl_ctx @socks << @serv # Update Service update until @stop rescue Exception => e puts "[!] ERROR - RxIO Service Update failed - #{e.inspect}" e.backtrace.each { |b| puts " - #{b}" } end # Drop all Clients @service_handler.on_drop @clients.shift until @clients.empty? if @service_handler.respond_to? :on_drop @cmap = {} # Close all Sockets @socks.each { |s| s.close } @socks = [] # Drop Server @serv = nil end
Send Message: Proxy method - Sends message through the defined handler. @param [Hash] endpoint @param [String] msg
# File lib/rxio/service.rb, line 104 def send_msg endpoint, msg @service_handler.send_msg endpoint, msg end
Stop: Requests the service loop to stop executing. The run method should return shortly after calling stop.
# File lib/rxio/service.rb, line 96 def stop @stop = true end
Private Instance Methods
Accept Socket: Tries to accept any queued connection request in a non-blocking manner. Registers a new Client
through add_client around the newly-accepted socket if present. @param [TCPServer] s The service's listening socket
# File lib/rxio/service.rb, line 176 def acpt_sock s # Accept ns = s.accept_nonblock rescue nil # Register Client add_client ns if ns end
Register Client: Creates a new Client
around a given socket s and registers it. Also, notifies the Handler Module if the on_join method is available. @param [TCPSocket] s The new client's network socket
# File lib/rxio/service.rb, line 134 def add_client s # Register Socket @socks << s # Acquire Peer Address peer = s.peeraddr # Compute ID id = peer.join '_' # Create Client c = { id: id, sock: s, peer: { port: peer[1], name: peer[2], addr: peer[3] }, ibuf: '', obuf: '', lock: Mutex.new, serv: self, local: self, msgs: [] } # Register Client @clients << c # Map Client @cmap[s] = c # Notify Service Handler @service_handler.on_join c if @service_handler.respond_to? :on_join end
Update: Serves as the service loop main method, performing I/O scheduling, client management and message handling.
# File lib/rxio/service.rb, line 113 def update # Collect Sockets with Output ws = @clients.reject { |c| c[:lock].synchronize { c[:obuf].empty? } }.collect { |c| c[:sock] } # Select Sockets rd, wr, _er = IO.select @socks, ws, [], SELECT_TIMEOUT # Handle I/O rd.each { |s| s == @serv ? acpt_sock(s) : read_sock(s) } if rd wr.each { |s| write_sock s } if wr # Drop Clients drop = @clients.select { |c| c[:lock].synchronize { c[:obuf].empty? && c[:drop] } } drop_endpoint drop.shift until drop.empty? end