class RxIO::Service

Service Class

Constants

SELECT_TIMEOUT

Select Timeout (seconds)

Public Class Methods

new(addr, port, service_handler, ssl_params = nil) click to toggle source

Construct: Builds a Service around a given service_handler module, set to listen for incoming connections @ addr on port. @param [String] addr Address on which the service should listen @param [Fixnum] port Port on which the service should listen @param [Module] service_handler Module implementing the service logic @param [Hash] ssl_params Optional SSL Parameters ({ cert: '/path/to/cert', pkey: '/path/to/privatekey' }) - will create a secure server if not nil

# File lib/rxio/service.rb, line 35
def initialize addr, port, service_handler, ssl_params = nil

        # Set SSL Params
        if ssl_params
                @ssl_ctx = OpenSSL::SSL::SSLContext.new
                @ssl_ctx.cert = OpenSSL::X509::Certificate.new File.read(ssl_params[:cert])
                @ssl_ctx.key = OpenSSL::PKey::RSA.new File.read(ssl_params[:pkey])
        end

        # Set Address & Port
        @addr = addr
        @port = port

        # Set Service Handler Module
        @service_handler = service_handler

        # Create Sockets
        @socks = []

        # Create Clients
        @clients = []

        # Create Client Map
        @cmap = {}
end

Public Instance Methods

run() click to toggle source

Run: Executes the main service loop, taking care of I/O scheduling, client management and message handling. This method blocks until the service loop terminates.

# File lib/rxio/service.rb, line 64
def run

        # Update Loop
        begin

                # Create TCP Socket Server
                @serv = TCPServer.new @addr, @port
                @serv = OpenSSL::SSL::SSLServer.new @serv, @ssl_ctx if defined?(@ssl_ctx) && @ssl_ctx
                @socks << @serv

                # Update Service
                update until @stop
        rescue Exception => e
                puts "[!] ERROR - RxIO Service Update failed - #{e.inspect}"
                e.backtrace.each { |b| puts "    - #{b}" }
        end

        # Drop all Clients
        @service_handler.on_drop @clients.shift until @clients.empty? if @service_handler.respond_to? :on_drop
        @cmap = {}

        # Close all Sockets
        @socks.each { |s| s.close }
        @socks = []

        # Drop Server
        @serv = nil
end
send_msg(endpoint, msg) click to toggle source

Send Message: Proxy method - Sends message through the defined handler. @param [Hash] endpoint @param [String] msg

# File lib/rxio/service.rb, line 104
def send_msg endpoint, msg
        @service_handler.send_msg endpoint, msg
end
stop() click to toggle source

Stop: Requests the service loop to stop executing. The run method should return shortly after calling stop.

# File lib/rxio/service.rb, line 96
def stop
        @stop = true
end

Private Instance Methods

acpt_sock(s) click to toggle source

Accept Socket: Tries to accept any queued connection request in a non-blocking manner. Registers a new Client through add_client around the newly-accepted socket if present. @param [TCPServer] s The service's listening socket

# File lib/rxio/service.rb, line 176
def acpt_sock s

        # Accept
        ns = s.accept_nonblock rescue nil

        # Register Client
        add_client ns if ns
end
add_client(s) click to toggle source

Register Client: Creates a new Client around a given socket s and registers it. Also, notifies the Handler Module if the on_join method is available. @param [TCPSocket] s The new client's network socket

# File lib/rxio/service.rb, line 134
def add_client s

        # Register Socket
        @socks << s

        # Acquire Peer Address
        peer = s.peeraddr

        # Compute ID
        id = peer.join '_'

        # Create Client
        c = {
                id: id,
                sock: s,
                peer: {
                        port: peer[1],
                        name: peer[2],
                        addr: peer[3]
                },
                ibuf: '',
                obuf: '',
                lock: Mutex.new,
                serv: self,
                local: self,
            msgs: []
        }

        # Register Client
        @clients << c

        # Map Client
        @cmap[s] = c

        # Notify Service Handler
        @service_handler.on_join c if @service_handler.respond_to? :on_join
end
get_endpoint_for_sock(s) click to toggle source

Get Endpoint for Socket: Callback for IOBase - Finds the Client associated with a given Socket. @param [TCPSocket] s Any socket @return [Hash] The Endpoint associated with Socket s, or nil

# File lib/rxio/service.rb, line 189
def get_endpoint_for_sock s
        @cmap[s]
end
on_drop(c) click to toggle source

On Drop: Callback for IOBase - Unregisters a Client and closes the associated socket. @param [Hash] c Client Hash

# File lib/rxio/service.rb, line 196
def on_drop c

        # Drop Client
        @cmap.delete c[:sock]
        @socks.delete c[:sock]
        @clients.delete c

        # Kill Socket
        c[:sock].close rescue nil
end
update() click to toggle source

Update: Serves as the service loop main method, performing I/O scheduling, client management and message handling.

# File lib/rxio/service.rb, line 113
def update

        # Collect Sockets with Output
        ws = @clients.reject { |c| c[:lock].synchronize { c[:obuf].empty? } }.collect { |c| c[:sock] }

        # Select Sockets
        rd, wr, _er = IO.select @socks, ws, [], SELECT_TIMEOUT

        # Handle I/O
        rd.each { |s| s == @serv ? acpt_sock(s) : read_sock(s) } if rd
        wr.each { |s| write_sock s } if wr

        # Drop Clients
        drop = @clients.select { |c| c[:lock].synchronize { c[:obuf].empty? && c[:drop] } }
        drop_endpoint drop.shift until drop.empty?
end