class NATS::IO::Socket
Implementation adapted from github.com/redis/redis-rb
Attributes
socket[RW]
Public Class Methods
new(options={})
click to toggle source
# File lib/nats/io/client.rb, line 1594 def initialize(options={}) @uri = options[:uri] @connect_timeout = options[:connect_timeout] @write_timeout = options[:write_timeout] @read_timeout = options[:read_timeout] @socket = nil end
Public Instance Methods
close()
click to toggle source
# File lib/nats/io/client.rb, line 1683 def close @socket.close end
closed?()
click to toggle source
# File lib/nats/io/client.rb, line 1687 def closed? @socket.closed? end
connect()
click to toggle source
# File lib/nats/io/client.rb, line 1602 def connect addrinfo = ::Socket.getaddrinfo(@uri.host, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM) addrinfo.each_with_index do |ai, i| begin @socket = connect_addrinfo(ai, @uri.port, @connect_timeout) break rescue SystemCallError => e # Give up if no more available raise e if addrinfo.length == i+1 end end # Set TCP no delay by default @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) end
read(max_bytes, deadline=nil)
click to toggle source
# File lib/nats/io/client.rb, line 1626 def read(max_bytes, deadline=nil) begin return @socket.read_nonblock(max_bytes) rescue ::IO::WaitReadable if ::IO.select([@socket], nil, nil, deadline) retry else raise SocketTimeoutError end rescue ::IO::WaitWritable if ::IO.select(nil, [@socket], nil, deadline) retry else raise SocketTimeoutError end end rescue EOFError => e if RUBY_ENGINE == 'jruby' and e.message == 'No message available' # FIXME: <EOFError: No message available> can happen in jruby # even though seems it is temporary and eventually possible # to read from socket. return nil end raise Errno::ECONNRESET end
read_line(deadline=nil)
click to toggle source
# File lib/nats/io/client.rb, line 1618 def read_line(deadline=nil) # FIXME: Should accumulate and read in a non blocking way instead unless ::IO.select([@socket], nil, nil, deadline) raise SocketTimeoutError end @socket.gets end
write(data, deadline=nil)
click to toggle source
# File lib/nats/io/client.rb, line 1653 def write(data, deadline=nil) length = data.bytesize total_written = 0 loop do begin written = @socket.write_nonblock(data) total_written += written break total_written if total_written >= length data = data.byteslice(written..-1) rescue ::IO::WaitWritable if ::IO.select(nil, [@socket], nil, deadline) retry else raise SocketTimeoutError end rescue ::IO::WaitReadable if ::IO.select([@socket], nil, nil, deadline) retry else raise SocketTimeoutError end end end rescue EOFError raise Errno::ECONNRESET end
Private Instance Methods
connect_addrinfo(ai, port, timeout)
click to toggle source
# File lib/nats/io/client.rb, line 1693 def connect_addrinfo(ai, port, timeout) sock = ::Socket.new(::Socket.const_get(ai[0]), ::Socket::SOCK_STREAM, 0) sockaddr = ::Socket.pack_sockaddr_in(port, ai[3]) begin sock.connect_nonblock(sockaddr) rescue Errno::EINPROGRESS, Errno::EALREADY, ::IO::WaitWritable unless ::IO.select(nil, [sock], nil, @connect_timeout) raise SocketTimeoutError end # Confirm that connection was established begin sock.connect_nonblock(sockaddr) rescue Errno::EISCONN # Connection was established without issues. end end sock end