module RxIO::IOBase

I/O Base Module

Constants

CHUNK_SIZE

Chunk Size

RETRY_EXCEPTIONS

Retry Exceptions

Public Instance Methods

drop_endpoint(endpoint) click to toggle source

Drop Endpoint: Notifies the Service Handler and Parent Implementation (in that order) of a dropped endpoint. @param [Hash] endpoint

# File lib/rxio/io_base.rb, line 103
def drop_endpoint endpoint

        # Notify Service Handler
        @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop

        # Drop Endpoint
        on_drop endpoint
end
peer_error(p, _e) click to toggle source

Peer Error: Handles an Error from a Peer. @param [Hash] p Endpoint Hash @param [Exception] e Exception

# File lib/rxio/io_base.rb, line 116
def peer_error p, _e
        drop_endpoint p
end
process_input(endpoint, chunk) click to toggle source

Process Input: Processes Input for an Endpoint, in the form of a data chunk. @param [Hash] endpoint @param [String] chunk A chunk of data, as received by the socket

# File lib/rxio/io_base.rb, line 26
def process_input endpoint, chunk

        # Begin
        begin

                # Pass through Service Handler Module
                @service_handler.filter_input endpoint, chunk

                # Process Messages
                @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty?

                # Sub-Process Input
                @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input

                # Rescue
        rescue Exception => e

                # Peer Error
                peer_error endpoint, e
        end
end
read_sock(s) click to toggle source

Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE) from a given socket s, passing the data chunks to process_input. @param [TCPSocket] s

# File lib/rxio/io_base.rb, line 51
def read_sock s

        # Acquire Endpoint for Socket
        e = get_endpoint_for_sock s

        # Check Endpoint
        return unless e

        # OpenSSL doesn't play nice with 'select' because of intermediate buffering.
        # Therefore we loop until we either have an explicit error or a signal to wait.
        done = false
        chunk = ''
        rdstr = ''
        until done || !rdstr
                begin
                        rdstr = s.read_nonblock(CHUNK_SIZE)
                        chunk << rdstr if rdstr
                rescue Exception => ex
                        if RETRY_EXCEPTIONS.include? ex.class; then done = true; else rdstr = nil; end
                end
        end

        # Drop Endpoint & Abort on Error
        return drop_endpoint e unless rdstr

        # Process Input
        process_input e, chunk
end
write_sock(s) click to toggle source

Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client's output buffer. @param [TCPSocket] s

# File lib/rxio/io_base.rb, line 83
def write_sock s

        # Acquire Endpoint
        e = get_endpoint_for_sock s

        # Check Endpoint
        return unless e

        # Synchronize Endpoint
        e[:lock].synchronize do

                # Write as much as possible
                size = (s.write_nonblock e[:obuf] rescue nil) || 0
                e[:obuf].slice!(0, size) if size > 0
        end
end