class RethinkDB::Connection

Public Instance Methods

run(msg, opts, b) click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 255
def run(msg, opts, b)
  query_handle_class = opts.delete(:query_handle_class) || QueryHandle
  reconnect(:noreply_wait => false) if @auto_reconnect && !is_open()
  raise ReqlRuntimeError, "Connection is closed." if !is_open()

  global_optargs = {}
  all_opts = @default_opts.merge(opts)
  if all_opts.keys.include?(:noreply)
    all_opts[:noreply] = !!all_opts[:noreply]
  end

  token = new_token
  q = [Query::QueryType::START,
       msg,
       Hash[all_opts.map {|k,v|
              [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)]
            }]]

  if b.is_a? Handler
    callback = query_handle_class.new(b, msg, all_opts, token, self)
    register_query(token, all_opts, callback)
    dispatch(q, token)
    return callback
  else
    res = run_internal(q, all_opts, token)
    return res if !res
    if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, true)
    elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, false)
    else
      value = Shim.response_to_native(res, msg, opts)
    end

    if res['p']
      real_val = {
        "profile" => res['p'],
        "value" => value
      }
    else
      real_val = value
    end

    if b
      begin
        b.call(real_val)
      ensure
        value.close if value.is_a?(Cursor)
      end
    else
      real_val
    end
  end
end