class NATS::IO::Socket

Implementation adapted from github.com/redis/redis-rb

Attributes

socket[RW]

Public Class Methods

new(options={}) click to toggle source
# File lib/nats/io/client.rb, line 1594
def initialize(options={})
  @uri = options[:uri]
  @connect_timeout = options[:connect_timeout]
  @write_timeout = options[:write_timeout]
  @read_timeout = options[:read_timeout]
  @socket = nil
end

Public Instance Methods

close() click to toggle source
# File lib/nats/io/client.rb, line 1683
def close
  @socket.close
end
closed?() click to toggle source
# File lib/nats/io/client.rb, line 1687
def closed?
  @socket.closed?
end
connect() click to toggle source
# File lib/nats/io/client.rb, line 1602
def connect
  addrinfo = ::Socket.getaddrinfo(@uri.host, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM)
  addrinfo.each_with_index do |ai, i|
    begin
      @socket = connect_addrinfo(ai, @uri.port, @connect_timeout)
      break
    rescue SystemCallError => e
      # Give up if no more available
      raise e if addrinfo.length == i+1
    end
  end

  # Set TCP no delay by default
  @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
end
read(max_bytes, deadline=nil) click to toggle source
# File lib/nats/io/client.rb, line 1626
def read(max_bytes, deadline=nil)

  begin
    return @socket.read_nonblock(max_bytes)
  rescue ::IO::WaitReadable
    if ::IO.select([@socket], nil, nil, deadline)
      retry
    else
      raise SocketTimeoutError
    end
  rescue ::IO::WaitWritable
    if ::IO.select(nil, [@socket], nil, deadline)
      retry
    else
      raise SocketTimeoutError
    end
  end
rescue EOFError => e
  if RUBY_ENGINE == 'jruby' and e.message == 'No message available'
    # FIXME: <EOFError: No message available> can happen in jruby
    # even though seems it is temporary and eventually possible
    # to read from socket.
    return nil
  end
  raise Errno::ECONNRESET
end
read_line(deadline=nil) click to toggle source
# File lib/nats/io/client.rb, line 1618
def read_line(deadline=nil)
  # FIXME: Should accumulate and read in a non blocking way instead
  unless ::IO.select([@socket], nil, nil, deadline)
    raise SocketTimeoutError
  end
  @socket.gets
end
write(data, deadline=nil) click to toggle source
# File lib/nats/io/client.rb, line 1653
def write(data, deadline=nil)
  length = data.bytesize
  total_written = 0

  loop do
    begin
      written = @socket.write_nonblock(data)

      total_written += written
      break total_written if total_written >= length
      data = data.byteslice(written..-1)
    rescue ::IO::WaitWritable
      if ::IO.select(nil, [@socket], nil, deadline)
        retry
      else
        raise SocketTimeoutError
      end
    rescue ::IO::WaitReadable
      if ::IO.select([@socket], nil, nil, deadline)
        retry
      else
        raise SocketTimeoutError
      end
    end
  end

rescue EOFError
  raise Errno::ECONNRESET
end

Private Instance Methods

connect_addrinfo(ai, port, timeout) click to toggle source
# File lib/nats/io/client.rb, line 1693
def connect_addrinfo(ai, port, timeout)
  sock = ::Socket.new(::Socket.const_get(ai[0]), ::Socket::SOCK_STREAM, 0)
  sockaddr = ::Socket.pack_sockaddr_in(port, ai[3])

  begin
    sock.connect_nonblock(sockaddr)
  rescue Errno::EINPROGRESS, Errno::EALREADY, ::IO::WaitWritable
    unless ::IO.select(nil, [sock], nil, @connect_timeout)
      raise SocketTimeoutError
    end

    # Confirm that connection was established
    begin
      sock.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # Connection was established without issues.
    end
  end

  sock
end