class RethinkDB::Connection
Public Instance Methods
run(msg, opts, b)
click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 255 def run(msg, opts, b) query_handle_class = opts.delete(:query_handle_class) || QueryHandle reconnect(:noreply_wait => false) if @auto_reconnect && !is_open() raise ReqlRuntimeError, "Connection is closed." if !is_open() global_optargs = {} all_opts = @default_opts.merge(opts) if all_opts.keys.include?(:noreply) all_opts[:noreply] = !!all_opts[:noreply] end token = new_token q = [Query::QueryType::START, msg, Hash[all_opts.map {|k,v| [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)] }]] if b.is_a? Handler callback = query_handle_class.new(b, msg, all_opts, token, self) register_query(token, all_opts, callback) dispatch(q, token) return callback else res = run_internal(q, all_opts, token) return res if !res if res['t'] == Response::ResponseType::SUCCESS_PARTIAL value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, true) elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, false) else value = Shim.response_to_native(res, msg, opts) end if res['p'] real_val = { "profile" => res['p'], "value" => value } else real_val = value end if b begin b.call(real_val) ensure value.close if value.is_a?(Cursor) end else real_val end end end