module RxIO::IOBase
I/O Base Module
Constants
- CHUNK_SIZE
Chunk Size
- RETRY_EXCEPTIONS
Retry Exceptions
Public Instance Methods
drop_endpoint(endpoint)
click to toggle source
Drop Endpoint: Notifies the Service
Handler and Parent Implementation (in that order) of a dropped endpoint. @param [Hash] endpoint
# File lib/rxio/io_base.rb, line 103 def drop_endpoint endpoint # Notify Service Handler @service_handler.on_drop endpoint if @service_handler.respond_to? :on_drop # Drop Endpoint on_drop endpoint end
peer_error(p, _e)
click to toggle source
Peer Error: Handles an Error from a Peer. @param [Hash] p Endpoint Hash @param [Exception] e Exception
# File lib/rxio/io_base.rb, line 116 def peer_error p, _e drop_endpoint p end
process_input(endpoint, chunk)
click to toggle source
Process Input: Processes Input for an Endpoint, in the form of a data chunk. @param [Hash] endpoint @param [String] chunk A chunk of data, as received by the socket
# File lib/rxio/io_base.rb, line 26 def process_input endpoint, chunk # Begin begin # Pass through Service Handler Module @service_handler.filter_input endpoint, chunk # Process Messages @service_handler.handle_msg endpoint, endpoint[:msgs].shift until endpoint[:msgs].empty? # Sub-Process Input @service_handler.subprocess_input endpoint if @service_handler.respond_to? :subprocess_input # Rescue rescue Exception => e # Peer Error peer_error endpoint, e end end
read_sock(s)
click to toggle source
Read Socket: Attempts to read as many bytes as possible (up to CHUNK_SIZE
) from a given socket s, passing the data chunks to process_input. @param [TCPSocket] s
# File lib/rxio/io_base.rb, line 51 def read_sock s # Acquire Endpoint for Socket e = get_endpoint_for_sock s # Check Endpoint return unless e # OpenSSL doesn't play nice with 'select' because of intermediate buffering. # Therefore we loop until we either have an explicit error or a signal to wait. done = false chunk = '' rdstr = '' until done || !rdstr begin rdstr = s.read_nonblock(CHUNK_SIZE) chunk << rdstr if rdstr rescue Exception => ex if RETRY_EXCEPTIONS.include? ex.class; then done = true; else rdstr = nil; end end end # Drop Endpoint & Abort on Error return drop_endpoint e unless rdstr # Process Input process_input e, chunk end
write_sock(s)
click to toggle source
Write Socket: Attempts to write as many bytes as possible to a given socket s from the associated client's output buffer. @param [TCPSocket] s
# File lib/rxio/io_base.rb, line 83 def write_sock s # Acquire Endpoint e = get_endpoint_for_sock s # Check Endpoint return unless e # Synchronize Endpoint e[:lock].synchronize do # Write as much as possible size = (s.write_nonblock e[:obuf] rescue nil) || 0 e[:obuf].slice!(0, size) if size > 0 end end