class Kafka::SocketWithTimeout

Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.

It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Public Class Methods

new(host, port, connect_timeout: nil, timeout: nil) click to toggle source

Opens a socket.

@param host [String] @param port [Integer] @param connect_timeout [Integer] the connection timeout, in seconds. @param timeout [Integer] the read and write timeout, in seconds. @raise [Errno::ETIMEDOUT] if the timeout is exceeded.

# File lib/kafka/socket_with_timeout.rb, line 23
def initialize(host, port, connect_timeout: nil, timeout: nil)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end
end

Public Instance Methods

close() click to toggle source
# File lib/kafka/socket_with_timeout.rb, line 84
def close
  @socket.close
end
closed?() click to toggle source
# File lib/kafka/socket_with_timeout.rb, line 88
def closed?
  @socket.closed?
end
read(num_bytes) click to toggle source

Reads bytes from the socket, possible with a timeout.

@param num_bytes [Integer] the number of bytes to read. @raise [Errno::ETIMEDOUT] if the timeout is exceeded. @return [String] the data that was read from the socket.

# File lib/kafka/socket_with_timeout.rb, line 61
def read(num_bytes)
  unless IO.select([@socket], nil, nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.read(num_bytes)
rescue IO::EAGAINWaitReadable
  retry
end
set_encoding(encoding) click to toggle source
# File lib/kafka/socket_with_timeout.rb, line 92
def set_encoding(encoding)
  @socket.set_encoding(encoding)
end
write(bytes) click to toggle source

Writes bytes to the socket, possible with a timeout.

@param bytes [String] the data that should be written to the socket. @raise [Errno::ETIMEDOUT] if the timeout is exceeded. @return [Integer] the number of bytes written.

# File lib/kafka/socket_with_timeout.rb, line 76
def write(bytes)
  unless IO.select(nil, [@socket], nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.write(bytes)
end