class RethinkDB::Connection
Attributes
conn_id[R]
default_db[R]
host[R]
port[R]
Public Class Methods
last()
click to toggle source
# File lib/net.rb, line 794 def self.last return @@last if @@last raise ReqlRuntimeError, "No last connection. Use RethinkDB::Connection.new." end
new(opts={})
click to toggle source
# File lib/net.rb, line 468 def initialize(opts={}) begin @abort_module = ::IRB rescue NameError => e @abort_module = Faux_Abort end opts = Hash[opts.map{|(k,v)| [k.to_sym,v]}] if opts.is_a?(Hash) opts = {:host => opts} if opts.is_a?(String) @host = opts[:host] || "localhost" @port = (opts[:port] || 28015).to_i @default_db = opts[:db] @user = opts[:user] || "admin" @user = @user.gsub("=", "=3D").gsub(",","=2C") if opts[:password] && opts[:auth_key] raise ReqlDriverError, "Cannot specify both a password and an auth key." end @password = opts[:password] || opts[:auth_key] || "" @nonce = SecureRandom.base64(18) @timeout = opts[:timeout].to_i @timeout = 20 if @timeout <= 0 @ssl_opts = opts[:ssl] || {} @@last = self @default_opts = @default_db ? {:db => RQL.new.db(@default_db)} : {} @conn_id = 0 @token_cnt = 0 @token_cnt_mutex = Mutex.new connect() end
xor(str1, str2)
click to toggle source
# File lib/net.rb, line 935 def self.xor(str1, str2) out = str1.dup out.bytesize.times {|i| out.setbyte(i, out.getbyte(i) ^ str2.getbyte(i)) } return out end
Public Instance Methods
auto_reconnect(x=true)
click to toggle source
# File lib/net.rb, line 462 def auto_reconnect(x=true) @auto_reconnect = x self end
base_socket()
click to toggle source
# File lib/net.rb, line 705 def base_socket socket = TCPSocket.open(@host, @port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) socket end
check_nonce(server_nonce, nonce)
click to toggle source
# File lib/net.rb, line 875 def check_nonce(server_nonce, nonce) if !server_nonce.start_with?(nonce) raise ReqlDriverError, "Invalid nonce #{server_nonce} received from server." end end
check_server_signature_claim(server_signature_claim, server_signature)
click to toggle source
# File lib/net.rb, line 952 def check_server_signature_claim(server_signature_claim, server_signature) if !const_eq(server_signature_claim, server_signature) sig1 = Base64.strict_encode64(server_signature_claim) sig2 = Base64.strict_encode64(server_signature) raise ReqlAuthError, "Server signature #{sig1} does "+ "not match expected signature #{sig2}." end end
check_version(server_info)
click to toggle source
# File lib/net.rb, line 866 def check_version(server_info) if server_info['min_protocol_version'] > @@protocol_version || server_info['max_protocol_version'] < @@protocol_version raise ReqlDriverError, "Version mismatch: Driver uses #{@@protocol_version} "+ "but server accepts "+ "[#{server_info['min_version']}, #{server_info['max_version']}]." end end
client_address()
click to toggle source
# File lib/net.rb, line 505 def client_address is_open() ? @socket.addr[3] : nil end
client_port()
click to toggle source
# File lib/net.rb, line 502 def client_port is_open() ? @socket.addr[1] : nil end
close(opts={})
click to toggle source
# File lib/net.rb, line 739 def close(opts={}) EM_Guard.unregister(self) raise ArgumentError, "Argument to close must be a hash." if opts.class != Hash if !(opts.keys - [:noreply_wait]).empty? raise ArgumentError, "close does not understand these options: " + (opts.keys - [:noreply_wait]).to_s end opts[:noreply_wait] = true if !opts.keys.include?(:noreply_wait) @mon.synchronize { @opts.clear @data.clear @waiters.each {|k,v| case v when QueryHandle v.handle_force_close when MonitorMixin::ConditionVariable @waiters[k] = nil v.signal end } @waiters.clear } noreply_wait() if opts[:noreply_wait] && is_open() if @listener @listener.terminate @listener.join end @socket.close if @socket @listener = nil @socket = nil self end
closed?()
click to toggle source
# File lib/net.rb, line 735 def closed? !is_open end
connect()
click to toggle source
# File lib/net.rb, line 680 def connect() raise RuntimeError, "Connection must be closed before calling connect." if @socket init_socket @mon = Monitor.new @waiters = {} @opts = {} @data = {} @conn_id += 1 start_listener self end
const_eq(a, b)
click to toggle source
# File lib/net.rb, line 943 def const_eq(a, b) return false if a.size != b.size residue = 0 a.unpack('C*').zip(b.unpack('C*')).each{|x,y| residue |= (x ^ y) } return residue == 0 end
create_context(options)
click to toggle source
# File lib/net.rb, line 712 def create_context(options) context = OpenSSL::SSL::SSLContext.new context.ssl_version = :TLSv1_2 if options[:ca_certs] context.ca_file = options[:ca_certs] context.verify_mode = OpenSSL::SSL::VERIFY_PEER else raise 'ssl options provided but missing required "ca_certs" option' end context end
debug_socket()
click to toggle source
# File lib/net.rb, line 669 def debug_socket; @socket; end
dispatch(msg, token)
click to toggle source
# File lib/net.rb, line 605 def dispatch(msg, token) payload = Shim.dump_json(msg).force_encoding('BINARY') prefix = [token, payload.bytesize].pack('Q<L<') send(prefix + payload) return token end
do_handshake()
click to toggle source
# File lib/net.rb, line 961 def do_handshake begin send([@@magic_number].pack('L<')) client_first_message_bare = "n=#{@user},r=#{@nonce}" send_json({ protocol_version: @@protocol_version, authentication_method: "SCRAM-SHA-256", authentication: "n,,#{client_first_message_bare}" }) server_version_msg = rcv_json check_version(server_version_msg) auth_resp = rcv_json server_first_message = auth_resp['authentication'] fields = Hash[server_first_message.split(',').map{|x| x.split('=', 2)}] server_nonce = fields['r'] client_final_message_without_proof = "c=biws,r=#{server_nonce}" auth_message = "#{client_first_message_bare},#{server_first_message},"+ client_final_message_without_proof check_nonce(server_nonce, @nonce) salt = Base64.decode64(fields['s']) iter = fields['i'].to_i salted_password = pbkdf2_hmac_sha256(@password, salt, iter) client_key = hmac(salted_password, "Client Key") stored_key = sha256(client_key) client_signature = hmac(stored_key, auth_message) client_proof = Connection::xor(client_key, client_signature) cproof_64 = Base64.strict_encode64(client_proof) msg = "#{client_final_message_without_proof},p=#{cproof_64}" send_json({authentication: msg}) sig_resp = rcv_json fields = Hash[sig_resp['authentication'].split(',').map{|x| x.split('=', 2)}] server_signature_claim = Base64::decode64(fields['v']) server_key = hmac(salted_password, "Server Key") server_signature = hmac(server_key, auth_message) check_server_signature_claim(server_signature_claim, server_signature) rescue ReqlError => e raise e rescue Exception => e raise ReqlDriverError, "Error during handshake: #{e.inspect} #{e.backtrace}" end end
hmac(*args)
click to toggle source
# File lib/net.rb, line 927 def hmac(*args) OpenSSL::HMAC.digest(OpenSSL::Digest::SHA256.new, *args) end
init_socket()
click to toggle source
# File lib/net.rb, line 692 def init_socket unless @ssl_opts.empty? @tcp_socket = base_socket context = create_context(@ssl_opts) @socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, context) @socket.sync_close = true @socket.connect verify_cert!(@socket, context) else @socket = base_socket end end
inspect()
click to toggle source
# File lib/net.rb, line 657 def inspect db = @default_opts[:db] || RQL.new.db('test') properties = "(#{@host}:#{@port}) (Default DB: #{db.inspect})" state = is_open() ? "(open)" : "(closed)" "#<RethinkDB::Connection:#{object_id} #{properties} #{state}>" end
is_open()
click to toggle source
# File lib/net.rb, line 732 def is_open @socket && @listener end
new_token()
click to toggle source
# File lib/net.rb, line 509 def new_token @token_cnt_mutex.synchronize{@token_cnt += 1} end
noreply_wait()
click to toggle source
# File lib/net.rb, line 774 def noreply_wait raise ReqlRuntimeError, "Connection is closed." if !is_open() q = [Query::QueryType::NOREPLY_WAIT] res = run_internal(q, {noreply: false}, new_token) if res['t'] != Response::ResponseType::WAIT_COMPLETE raise ReqlRuntimeError, "Unexpected response to noreply_wait: " + PP.pp(res, "") end nil end
note_data(token, data)
click to toggle source
# File lib/net.rb, line 810 def note_data(token, data) # Synchronize around this! @opts.delete(token) w = @waiters.delete(token) case w when MonitorMixin::ConditionVariable @data[token] = data w.signal when QueryHandle w.callback(data) when nil # nothing else raise ReqlDriverError, "Unrecognized value #{w.inspect} in `@waiters`." end end
note_error(token, e)
click to toggle source
# File lib/net.rb, line 826 def note_error(token, e) # Synchronize around this! data = { 't' => Response::ResponseType::CLIENT_ERROR, 'r' => [e.message], 'b' => [] } note_data(token, data) end
pbkdf2_hmac_sha256(*args)
click to toggle source
# File lib/net.rb, line 898 def pbkdf2_hmac_sha256(*args) auth_func = res = nil @@auth_cache_mutex.synchronize { res = @@auth_cache[args] return res if res } @@auth_func_mutex.synchronize { auth_func = @@auth_func } begin res = auth_func.call(*args) rescue NotImplementedError => e if auth_func != @@slow_auth_func @@auth_func_mutex.synchronize { auth_func = @@auth_func = @@slow_auth_func } res = auth_func.call(*args) else raise e end end @@auth_cache_mutex.synchronize { @@auth_cache[args] = res } return res end
rcv_json()
click to toggle source
# File lib/net.rb, line 835 def rcv_json response = "" while response[-1..-1] != "\0" response += @socket.read_exn(1, @timeout) end begin res = JSON.parse(response[0...-1]) if !res['success'] msg = "Handshake error (#{res})." ecode = res['error_code'] if ecode && ecode >= 10 && ecode <= 20 msg = res['error'] if res['error'].to_s != "" raise ReqlAuthError, msg else raise ReqlDriverError, msg end end rescue Exception => e if !e.class.ancestors.include?(ReqlDriverError) raise ReqlDriverError, "Connection closed by server (#{e})." else raise e end end return res end
reconnect(opts={})
click to toggle source
Reconnect to the server. This will interrupt all queries on the server (if :noreply_wait => false) and invalidate all outstanding enumerables on the client.
# File lib/net.rb, line 674 def reconnect(opts={}) raise ArgumentError, "Argument to reconnect must be a hash." if opts.class != Hash close(opts) connect() end
register_query(token, opts, callback=nil)
click to toggle source
# File lib/net.rb, line 513 def register_query(token, opts, callback=nil) if !opts[:noreply] @mon.synchronize { if @waiters.has_key?(token) raise ReqlDriverError, "Internal driver error, token already in use." end @waiters[token] = callback ? callback : @mon.new_cond @opts[token] = opts } end end
remove_em_waiters()
click to toggle source
# File lib/net.rb, line 799 def remove_em_waiters @mon.synchronize { @waiters.each {|k,v| if v.is_a? QueryHandle v.handle_force_close @waiters.delete(k) end } } end
repl()
click to toggle source
# File lib/net.rb, line 466 def repl; RQL.set_default_conn self; end
run(msg, opts, b)
click to toggle source
# File lib/net.rb, line 535 def run(msg, opts, b) reconnect(:noreply_wait => false) if @auto_reconnect && !is_open() raise ReqlRuntimeError, "Connection is closed." if !is_open() global_optargs = {} all_opts = @default_opts.merge(opts) if all_opts.keys.include?(:noreply) all_opts[:noreply] = !!all_opts[:noreply] end token = new_token q = [Query::QueryType::START, msg, Hash[all_opts.map {|k,v| [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)] }]] if b.is_a? Handler callback = QueryHandle.new(b, msg, all_opts, token, self) register_query(token, all_opts, callback) dispatch(q, token) return callback else res = run_internal(q, all_opts, token) return res if !res if res['t'] == Response::ResponseType::SUCCESS_PARTIAL value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, true) elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, false) else value = Shim.response_to_native(res, msg, opts) end if res['p'] real_val = { "profile" => res['p'], "value" => value } else real_val = value end if b begin b.call(real_val) ensure value.close if value.is_a?(Cursor) end else real_val end end end
run_internal(q, opts, token)
click to toggle source
# File lib/net.rb, line 524 def run_internal(q, opts, token) register_query(token, opts) dispatch(q, token) opts[:noreply] ? nil : wait(token, nil) end
send(packet)
click to toggle source
# File lib/net.rb, line 591 def send packet @mon.synchronize { written = 0 while written < packet.length # Supposedly slice will not copy the array if it goes all # the way to the end We use IO::syswrite here rather than # IO::write because of incompatibilities in JRuby regarding # filling up the TCP send buffer. Reference: # https://github.com/rethinkdb/rethinkdb/issues/3795 written += @socket.syswrite(packet.slice(written, packet.length)) end } end
send_json(x)
click to toggle source
# File lib/net.rb, line 862 def send_json(x) send(x.to_json + "\0") end
server()
click to toggle source
# File lib/net.rb, line 784 def server raise ReqlRuntimeError, "Connection is closed." if !is_open() q = [Query::QueryType::SERVER_INFO] res = run_internal(q, {noreply: false}, new_token) if res['t'] != Response::ResponseType::SERVER_INFO raise ReqlRuntimeError, "Unexpected response to server_info: " + PP.pp(res, "") end res['r'][0] end
sha256(str)
click to toggle source
# File lib/net.rb, line 931 def sha256(str) OpenSSL::Digest.digest("SHA256", str) end
start_listener()
click to toggle source
# File lib/net.rb, line 1009 def start_listener class << @socket def maybe_timeout(sec=nil, &b) sec ? Timeout::timeout(sec, &b) : b.call end def read_exn(len, timeout_sec=nil) maybe_timeout(timeout_sec) { buf = read len if !buf || buf.length != len raise ReqlRuntimeError, "Connection closed by server." end return buf } end end do_handshake @listener = Thread.new { while true begin token = nil token = @socket.read_exn(8).unpack('q<')[0] response_length = @socket.read_exn(4).unpack('L<')[0] response = @socket.read_exn(response_length) begin data = Shim.load_json(response, @opts[token]) rescue Exception => e raise ReqlRuntimeError, "Bad response, server is buggy.\n" + "#{e.inspect}\n" + response end @mon.synchronize{note_data(token, data)} rescue Exception => e @mon.synchronize { @waiters.keys.each{ |k| note_error(k, e) } @listener = nil Thread.current.terminate abort("unreachable") } end end } end
stop(token)
click to toggle source
# File lib/net.rb, line 529 def stop(token) dispatch([Query::QueryType::STOP], token) @mon.synchronize { !!@waiters.delete(token) } end
use(new_default_db)
click to toggle source
Change the default database of a connection.
# File lib/net.rb, line 652 def use(new_default_db) @default_db = new_default_db @default_opts[:db] = RQL.new.db(new_default_db) end
verify_cert!(socket, context)
click to toggle source
# File lib/net.rb, line 724 def verify_cert!(socket, context) if context.verify_mode == OpenSSL::SSL::VERIFY_PEER unless OpenSSL::SSL.verify_certificate_identity(socket.peer_cert, host) raise 'SSL handshake failed due to a hostname mismatch.' end end end
wait(token, timeout)
click to toggle source
# File lib/net.rb, line 612 def wait(token, timeout) begin @mon.synchronize { end_time = timeout ? Time.now.to_f + timeout : nil loop { res = @data.delete(token) return res if res # Theoretically we only need to check the second property, # but this is safer in case someone makes changes to # `close` in the future. if !is_open() || !@waiters.has_key?(token) raise ReqlRuntimeError, "Connection is closed." end if end_time cur_time = Time.now.to_f if cur_time >= end_time raise Timeout::Error, "Timed out waiting for cursor response." else # We can't use `wait_while` because it doesn't take a # timeout, and we can't use an external `timeout { # ... }` block because in Ruby 1.9.1 it seems to confuse # the synchronization in `@mon` to be timed out while # waiting in a synchronize block. @waiters[token].wait(end_time - cur_time) end else @waiters[token].wait end } } rescue @abort_module::Abort => e print "\nAborting query and reconnecting...\n" reconnect(:noreply_wait => false) raise e end end