class RethinkDB::Connection

Attributes

conn_id[R]
default_db[R]
host[R]
port[R]

Public Class Methods

last() click to toggle source
# File lib/net.rb, line 794
def self.last
  return @@last if @@last
  raise ReqlRuntimeError, "No last connection.  Use RethinkDB::Connection.new."
end
new(opts={}) click to toggle source
# File lib/net.rb, line 468
def initialize(opts={})
  begin
    @abort_module = ::IRB
  rescue NameError => e
    @abort_module = Faux_Abort
  end

  opts = Hash[opts.map{|(k,v)| [k.to_sym,v]}] if opts.is_a?(Hash)
  opts = {:host => opts} if opts.is_a?(String)
  @host = opts[:host] || "localhost"
  @port = (opts[:port] || 28015).to_i
  @default_db = opts[:db]
  @user = opts[:user] || "admin"
  @user = @user.gsub("=", "=3D").gsub(",","=2C")
  if opts[:password] && opts[:auth_key]
    raise ReqlDriverError, "Cannot specify both a password and an auth key."
  end
  @password = opts[:password] || opts[:auth_key] || ""
  @nonce = SecureRandom.base64(18)
  @timeout = opts[:timeout].to_i
  @timeout = 20 if @timeout <= 0
  @ssl_opts = opts[:ssl] || {}

  @@last = self
  @default_opts = @default_db ? {:db => RQL.new.db(@default_db)} : {}
  @conn_id = 0

  @token_cnt = 0
  @token_cnt_mutex = Mutex.new

  connect()
end
xor(str1, str2) click to toggle source
# File lib/net.rb, line 935
def self.xor(str1, str2)
  out = str1.dup
  out.bytesize.times {|i|
    out.setbyte(i, out.getbyte(i) ^ str2.getbyte(i))
  }
  return out
end

Public Instance Methods

auto_reconnect(x=true) click to toggle source
# File lib/net.rb, line 462
def auto_reconnect(x=true)
  @auto_reconnect = x
  self
end
base_socket() click to toggle source
# File lib/net.rb, line 705
def base_socket
  socket = TCPSocket.open(@host, @port)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
  socket
end
check_nonce(server_nonce, nonce) click to toggle source
# File lib/net.rb, line 875
def check_nonce(server_nonce, nonce)
  if !server_nonce.start_with?(nonce)
    raise ReqlDriverError, "Invalid nonce #{server_nonce} received from server."
  end
end
check_server_signature_claim(server_signature_claim, server_signature) click to toggle source
# File lib/net.rb, line 952
def check_server_signature_claim(server_signature_claim, server_signature)
  if !const_eq(server_signature_claim, server_signature)
    sig1 = Base64.strict_encode64(server_signature_claim)
    sig2 = Base64.strict_encode64(server_signature)
    raise ReqlAuthError, "Server signature #{sig1} does "+
      "not match expected signature #{sig2}."
  end
end
check_version(server_info) click to toggle source
# File lib/net.rb, line 866
def check_version(server_info)
  if server_info['min_protocol_version'] > @@protocol_version ||
      server_info['max_protocol_version'] < @@protocol_version
    raise ReqlDriverError, "Version mismatch: Driver uses #{@@protocol_version} "+
      "but server accepts "+
      "[#{server_info['min_version']}, #{server_info['max_version']}]."
  end
end
client_address() click to toggle source
# File lib/net.rb, line 505
def client_address
  is_open() ? @socket.addr[3] : nil
end
client_port() click to toggle source
# File lib/net.rb, line 502
def client_port
  is_open() ? @socket.addr[1] : nil
end
close(opts={}) click to toggle source
# File lib/net.rb, line 739
def close(opts={})
  EM_Guard.unregister(self)
  raise ArgumentError, "Argument to close must be a hash." if opts.class != Hash
  if !(opts.keys - [:noreply_wait]).empty?
    raise ArgumentError, "close does not understand these options: " +
      (opts.keys - [:noreply_wait]).to_s
  end
  opts[:noreply_wait] = true if !opts.keys.include?(:noreply_wait)

  @mon.synchronize {
    @opts.clear
    @data.clear
    @waiters.each {|k,v|
      case v
      when QueryHandle
        v.handle_force_close
      when MonitorMixin::ConditionVariable
        @waiters[k] = nil
        v.signal
      end
    }
    @waiters.clear
  }

  noreply_wait() if opts[:noreply_wait] && is_open()
  if @listener
    @listener.terminate
    @listener.join
  end
  @socket.close if @socket
  @listener = nil
  @socket = nil
  self
end
closed?() click to toggle source
# File lib/net.rb, line 735
def closed?
  !is_open
end
connect() click to toggle source
# File lib/net.rb, line 680
def connect()
  raise RuntimeError, "Connection must be closed before calling connect." if @socket
  init_socket
  @mon = Monitor.new
  @waiters = {}
  @opts = {}
  @data = {}
  @conn_id += 1
  start_listener
  self
end
const_eq(a, b) click to toggle source
# File lib/net.rb, line 943
def const_eq(a, b)
  return false if a.size != b.size
  residue = 0
  a.unpack('C*').zip(b.unpack('C*')).each{|x,y|
    residue |= (x ^ y)
  }
  return residue == 0
end
create_context(options) click to toggle source
# File lib/net.rb, line 712
def create_context(options)
  context = OpenSSL::SSL::SSLContext.new
  context.ssl_version = :TLSv1_2
  if options[:ca_certs]
    context.ca_file = options[:ca_certs]
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
  else
    raise 'ssl options provided but missing required "ca_certs" option'
  end
  context
end
debug_socket() click to toggle source
# File lib/net.rb, line 669
def debug_socket; @socket; end
dispatch(msg, token) click to toggle source
# File lib/net.rb, line 605
def dispatch(msg, token)
  payload = Shim.dump_json(msg).force_encoding('BINARY')
  prefix = [token, payload.bytesize].pack('Q<L<')
  send(prefix + payload)
  return token
end
do_handshake() click to toggle source
# File lib/net.rb, line 961
def do_handshake
  begin
    send([@@magic_number].pack('L<'))
    client_first_message_bare = "n=#{@user},r=#{@nonce}"
    send_json({ protocol_version: @@protocol_version,
                authentication_method: "SCRAM-SHA-256",
                authentication: "n,,#{client_first_message_bare}" })

    server_version_msg = rcv_json
    check_version(server_version_msg)

    auth_resp = rcv_json
    server_first_message = auth_resp['authentication']

    fields = Hash[server_first_message.split(',').map{|x| x.split('=', 2)}]
    server_nonce = fields['r']

    client_final_message_without_proof = "c=biws,r=#{server_nonce}"
    auth_message = "#{client_first_message_bare},#{server_first_message},"+
      client_final_message_without_proof
    check_nonce(server_nonce, @nonce)
    salt = Base64.decode64(fields['s'])
    iter = fields['i'].to_i

    salted_password = pbkdf2_hmac_sha256(@password, salt, iter)

    client_key = hmac(salted_password, "Client Key")
    stored_key = sha256(client_key)
    client_signature = hmac(stored_key, auth_message)
    client_proof = Connection::xor(client_key, client_signature)
    cproof_64 = Base64.strict_encode64(client_proof)

    msg = "#{client_final_message_without_proof},p=#{cproof_64}"
    send_json({authentication: msg})

    sig_resp = rcv_json
    fields = Hash[sig_resp['authentication'].split(',').map{|x| x.split('=', 2)}]
    server_signature_claim = Base64::decode64(fields['v'])
    server_key = hmac(salted_password, "Server Key")
    server_signature = hmac(server_key, auth_message)
    check_server_signature_claim(server_signature_claim, server_signature)
  rescue ReqlError => e
    raise e
  rescue Exception => e
    raise ReqlDriverError, "Error during handshake: #{e.inspect} #{e.backtrace}"
  end
end
hmac(*args) click to toggle source
# File lib/net.rb, line 927
def hmac(*args)
  OpenSSL::HMAC.digest(OpenSSL::Digest::SHA256.new, *args)
end
init_socket() click to toggle source
# File lib/net.rb, line 692
def init_socket
  unless @ssl_opts.empty?
    @tcp_socket = base_socket
    context = create_context(@ssl_opts)
    @socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, context)
    @socket.sync_close = true
    @socket.connect
    verify_cert!(@socket, context)
  else
    @socket = base_socket
  end
end
inspect() click to toggle source
# File lib/net.rb, line 657
def inspect
  db = @default_opts[:db] || RQL.new.db('test')
  properties = "(#{@host}:#{@port}) (Default DB: #{db.inspect})"
  state = is_open() ? "(open)" : "(closed)"
  "#<RethinkDB::Connection:#{object_id} #{properties} #{state}>"
end
is_open() click to toggle source
# File lib/net.rb, line 732
def is_open
  @socket && @listener
end
new_token() click to toggle source
# File lib/net.rb, line 509
def new_token
  @token_cnt_mutex.synchronize{@token_cnt += 1}
end
noreply_wait() click to toggle source
# File lib/net.rb, line 774
def noreply_wait
  raise ReqlRuntimeError, "Connection is closed." if !is_open()
  q = [Query::QueryType::NOREPLY_WAIT]
  res = run_internal(q, {noreply: false}, new_token)
  if res['t'] != Response::ResponseType::WAIT_COMPLETE
    raise ReqlRuntimeError, "Unexpected response to noreply_wait: " + PP.pp(res, "")
  end
  nil
end
note_data(token, data) click to toggle source
# File lib/net.rb, line 810
def note_data(token, data) # Synchronize around this!
  @opts.delete(token)
  w = @waiters.delete(token)
  case w
  when MonitorMixin::ConditionVariable
    @data[token] = data
    w.signal
  when QueryHandle
    w.callback(data)
  when nil
    # nothing
  else
    raise ReqlDriverError, "Unrecognized value #{w.inspect} in `@waiters`."
  end
end
note_error(token, e) click to toggle source
# File lib/net.rb, line 826
def note_error(token, e) # Synchronize around this!
  data = {
    't' => Response::ResponseType::CLIENT_ERROR,
    'r' => [e.message],
    'b' => []
  }
  note_data(token, data)
end
pbkdf2_hmac_sha256(*args) click to toggle source
# File lib/net.rb, line 898
def pbkdf2_hmac_sha256(*args)
  auth_func = res = nil
  @@auth_cache_mutex.synchronize {
    res = @@auth_cache[args]
    return res if res
  }

  @@auth_func_mutex.synchronize {
    auth_func = @@auth_func
  }
  begin
    res = auth_func.call(*args)
  rescue NotImplementedError => e
    if auth_func != @@slow_auth_func
      @@auth_func_mutex.synchronize {
        auth_func = @@auth_func = @@slow_auth_func
      }
      res = auth_func.call(*args)
    else
      raise e
    end
  end

  @@auth_cache_mutex.synchronize {
    @@auth_cache[args] = res
  }
  return res
end
rcv_json() click to toggle source
# File lib/net.rb, line 835
def rcv_json
  response = ""
  while response[-1..-1] != "\0"
    response += @socket.read_exn(1, @timeout)
  end
  begin
    res = JSON.parse(response[0...-1])
    if !res['success']
      msg = "Handshake error (#{res})."
      ecode = res['error_code']
      if ecode && ecode >= 10 && ecode <= 20
        msg = res['error'] if res['error'].to_s != ""
        raise ReqlAuthError, msg
      else
        raise ReqlDriverError, msg
      end
    end
  rescue Exception => e
    if !e.class.ancestors.include?(ReqlDriverError)
      raise ReqlDriverError, "Connection closed by server (#{e})."
    else
      raise e
    end
  end
  return res
end
reconnect(opts={}) click to toggle source

Reconnect to the server. This will interrupt all queries on the server (if :noreply_wait => false) and invalidate all outstanding enumerables on the client.

# File lib/net.rb, line 674
def reconnect(opts={})
  raise ArgumentError, "Argument to reconnect must be a hash." if opts.class != Hash
  close(opts)
  connect()
end
register_query(token, opts, callback=nil) click to toggle source
# File lib/net.rb, line 513
def register_query(token, opts, callback=nil)
  if !opts[:noreply]
    @mon.synchronize {
      if @waiters.has_key?(token)
        raise ReqlDriverError, "Internal driver error, token already in use."
      end
      @waiters[token] = callback ? callback : @mon.new_cond
      @opts[token] = opts
    }
  end
end
remove_em_waiters() click to toggle source
# File lib/net.rb, line 799
def remove_em_waiters
  @mon.synchronize {
    @waiters.each {|k,v|
      if v.is_a? QueryHandle
        v.handle_force_close
        @waiters.delete(k)
      end
    }
  }
end
repl() click to toggle source
# File lib/net.rb, line 466
def repl; RQL.set_default_conn self; end
run(msg, opts, b) click to toggle source
# File lib/net.rb, line 535
def run(msg, opts, b)
  reconnect(:noreply_wait => false) if @auto_reconnect && !is_open()
  raise ReqlRuntimeError, "Connection is closed." if !is_open()

  global_optargs = {}
  all_opts = @default_opts.merge(opts)
  if all_opts.keys.include?(:noreply)
    all_opts[:noreply] = !!all_opts[:noreply]
  end

  token = new_token
  q = [Query::QueryType::START,
       msg,
       Hash[all_opts.map {|k,v|
              [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)]
            }]]

  if b.is_a? Handler
    callback = QueryHandle.new(b, msg, all_opts, token, self)
    register_query(token, all_opts, callback)
    dispatch(q, token)
    return callback
  else
    res = run_internal(q, all_opts, token)
    return res if !res
    if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, true)
    elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, false)
    else
      value = Shim.response_to_native(res, msg, opts)
    end

    if res['p']
      real_val = {
        "profile" => res['p'],
        "value" => value
      }
    else
      real_val = value
    end

    if b
      begin
        b.call(real_val)
      ensure
        value.close if value.is_a?(Cursor)
      end
    else
      real_val
    end
  end
end
run_internal(q, opts, token) click to toggle source
# File lib/net.rb, line 524
def run_internal(q, opts, token)
  register_query(token, opts)
  dispatch(q, token)
  opts[:noreply] ? nil : wait(token, nil)
end
send(packet) click to toggle source
# File lib/net.rb, line 591
def send packet
  @mon.synchronize {
    written = 0
    while written < packet.length
      # Supposedly slice will not copy the array if it goes all
      # the way to the end We use IO::syswrite here rather than
      # IO::write because of incompatibilities in JRuby regarding
      # filling up the TCP send buffer.  Reference:
      # https://github.com/rethinkdb/rethinkdb/issues/3795
      written += @socket.syswrite(packet.slice(written, packet.length))
    end
  }
end
send_json(x) click to toggle source
# File lib/net.rb, line 862
def send_json(x)
  send(x.to_json + "\0")
end
server() click to toggle source
# File lib/net.rb, line 784
def server
  raise ReqlRuntimeError, "Connection is closed." if !is_open()
  q = [Query::QueryType::SERVER_INFO]
  res = run_internal(q, {noreply: false}, new_token)
  if res['t'] != Response::ResponseType::SERVER_INFO
    raise ReqlRuntimeError, "Unexpected response to server_info: " + PP.pp(res, "")
  end
  res['r'][0]
end
sha256(str) click to toggle source
# File lib/net.rb, line 931
def sha256(str)
  OpenSSL::Digest.digest("SHA256", str)
end
start_listener() click to toggle source
# File lib/net.rb, line 1009
def start_listener
  class << @socket
    def maybe_timeout(sec=nil, &b)
      sec ? Timeout::timeout(sec, &b) : b.call
    end
    def read_exn(len, timeout_sec=nil)
      maybe_timeout(timeout_sec) {
        buf = read len
        if !buf || buf.length != len
          raise ReqlRuntimeError, "Connection closed by server."
        end
        return buf
      }
    end
  end
  do_handshake
  @listener = Thread.new {
    while true
      begin
        token = nil
        token = @socket.read_exn(8).unpack('q<')[0]
        response_length = @socket.read_exn(4).unpack('L<')[0]
        response = @socket.read_exn(response_length)
        begin
          data = Shim.load_json(response, @opts[token])
        rescue Exception => e
          raise ReqlRuntimeError, "Bad response, server is buggy.\n" +
            "#{e.inspect}\n" + response
        end
        @mon.synchronize{note_data(token, data)}
      rescue Exception => e
        @mon.synchronize {
          @waiters.keys.each{ |k| note_error(k, e) }
          @listener = nil
          Thread.current.terminate
          abort("unreachable")
        }
      end
    end
  }
end
stop(token) click to toggle source
# File lib/net.rb, line 529
def stop(token)
  dispatch([Query::QueryType::STOP], token)
  @mon.synchronize {
    !!@waiters.delete(token)
  }
end
use(new_default_db) click to toggle source

Change the default database of a connection.

# File lib/net.rb, line 652
def use(new_default_db)
  @default_db = new_default_db
  @default_opts[:db] = RQL.new.db(new_default_db)
end
verify_cert!(socket, context) click to toggle source
# File lib/net.rb, line 724
def verify_cert!(socket, context)
  if context.verify_mode == OpenSSL::SSL::VERIFY_PEER
    unless OpenSSL::SSL.verify_certificate_identity(socket.peer_cert, host)
      raise 'SSL handshake failed due to a hostname mismatch.'
    end
  end
end
wait(token, timeout) click to toggle source
# File lib/net.rb, line 612
def wait(token, timeout)
  begin
    @mon.synchronize {
      end_time = timeout ? Time.now.to_f + timeout : nil
      loop {
        res = @data.delete(token)
        return res if res

        # Theoretically we only need to check the second property,
        # but this is safer in case someone makes changes to
        # `close` in the future.
        if !is_open() || !@waiters.has_key?(token)
          raise ReqlRuntimeError, "Connection is closed."
        end

        if end_time
          cur_time = Time.now.to_f
          if cur_time >= end_time
            raise Timeout::Error, "Timed out waiting for cursor response."
          else
            # We can't use `wait_while` because it doesn't take a
            # timeout, and we can't use an external `timeout {
            # ... }` block because in Ruby 1.9.1 it seems to confuse
            # the synchronization in `@mon` to be timed out while
            # waiting in a synchronize block.
            @waiters[token].wait(end_time - cur_time)
          end
        else
          @waiters[token].wait
        end
      }
    }
  rescue @abort_module::Abort => e
    print "\nAborting query and reconnecting...\n"
    reconnect(:noreply_wait => false)
    raise e
  end
end