class RethinkDB::RQL

Public Instance Methods

async_run(*args, &b) click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 224
def async_run(*args, &b)
  unbound_if(@body == RQL)
  args = parse(*args, &b)
  if args[:block].is_a?(Proc)
    args[:block] = CallbackHandler.new(args[:block])
  end
  if !args[:block].is_a?(Handler)
    raise ArgumentError, "No handler specified."
  end

  async_handler = args[:async_handler]
  if !async_handler.is_a?(AsyncHandler)
    raise ArgumentError, "No async handler specified."
  end

  # If the user has defined the `on_state` method, we assume they want states.
  if args[:block].respond_to?(:on_state)
    args[:opts] = args[:opts].merge(include_states: true)
  end

  async_handler.callback = args[:block]
  async_handler.connection = args[:conn]
  async_handler.options = args[:opts]

  async_handler.run do
    async_handler.connection.run(@body, async_handler.options, async_handler.handler)
  end
end
em_run(*args, &b) click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 220
def em_run(*args, &b)
  async_run(*args, EMHandler, &b)
end
parse(*args, &b) click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 164
def parse(*args, &b)
  conn = nil
  opts = nil
  block = nil
  async_handler = nil
  args = args.map{|x| x.is_a?(Class) ? x.new : x}
  args.each {|arg|
    case arg
    when RethinkDB::Connection
      raise ArgumentError, "Unexpected second Connection #{arg.inspect}." if conn
      conn = arg
    when Hash
      raise ArgumentError, "Unexpected second Hash #{arg.inspect}." if opts
      opts = arg
    when Proc
      raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block
      block = arg
    when Handler
      raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block
      block = arg
    when AsyncHandler
      raise ArgumentError, "Unexpected second AsyncHandler #{arg.inspect}." if async_handler
      async_handler = arg
    else
      raise ArgumentError, "Unexpected argument #{arg.inspect} " +
        "(got #{args.inspect})."
    end
  }
  conn = @@default_conn if !conn
  opts = {} if !opts
  block = b if !block
  if (tf = opts[:time_format])
    opts[:time_format] = (tf = tf.to_s)
    if tf != 'raw' && tf != 'native'
      raise ArgumentError, "`time_format` must be 'raw' or 'native' (got `#{tf}`)."
    end
  end
  if (gf = opts[:group_format])
    opts[:group_format] = (gf = gf.to_s)
    if gf != 'raw' && gf != 'native'
      raise ArgumentError, "`group_format` must be 'raw' or 'native' (got `#{gf}`)."
    end
  end
  if (bf = opts[:binary_format])
    opts[:binary_format] = (bf = bf.to_s)
    if bf != 'raw' && bf != 'native'
      raise ArgumentError, "`binary_format` must be 'raw' or 'native' (got `#{bf}`)."
    end
  end
  if !conn
    raise ArgumentError, "No connection specified!\n" \
    "Use `query.run(conn)` or `conn.repl(); query.run`."
  end
  {conn: conn, opts: opts, block: block, async_handler: async_handler}
end