class RethinkDB::RQL
Public Instance Methods
async_run(*args, &b)
click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 224 def async_run(*args, &b) unbound_if(@body == RQL) args = parse(*args, &b) if args[:block].is_a?(Proc) args[:block] = CallbackHandler.new(args[:block]) end if !args[:block].is_a?(Handler) raise ArgumentError, "No handler specified." end async_handler = args[:async_handler] if !async_handler.is_a?(AsyncHandler) raise ArgumentError, "No async handler specified." end # If the user has defined the `on_state` method, we assume they want states. if args[:block].respond_to?(:on_state) args[:opts] = args[:opts].merge(include_states: true) end async_handler.callback = args[:block] async_handler.connection = args[:conn] async_handler.options = args[:opts] async_handler.run do async_handler.connection.run(@body, async_handler.options, async_handler.handler) end end
em_run(*args, &b)
click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 220 def em_run(*args, &b) async_run(*args, EMHandler, &b) end
parse(*args, &b)
click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 164 def parse(*args, &b) conn = nil opts = nil block = nil async_handler = nil args = args.map{|x| x.is_a?(Class) ? x.new : x} args.each {|arg| case arg when RethinkDB::Connection raise ArgumentError, "Unexpected second Connection #{arg.inspect}." if conn conn = arg when Hash raise ArgumentError, "Unexpected second Hash #{arg.inspect}." if opts opts = arg when Proc raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block block = arg when Handler raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block block = arg when AsyncHandler raise ArgumentError, "Unexpected second AsyncHandler #{arg.inspect}." if async_handler async_handler = arg else raise ArgumentError, "Unexpected argument #{arg.inspect} " + "(got #{args.inspect})." end } conn = @@default_conn if !conn opts = {} if !opts block = b if !block if (tf = opts[:time_format]) opts[:time_format] = (tf = tf.to_s) if tf != 'raw' && tf != 'native' raise ArgumentError, "`time_format` must be 'raw' or 'native' (got `#{tf}`)." end end if (gf = opts[:group_format]) opts[:group_format] = (gf = gf.to_s) if gf != 'raw' && gf != 'native' raise ArgumentError, "`group_format` must be 'raw' or 'native' (got `#{gf}`)." end end if (bf = opts[:binary_format]) opts[:binary_format] = (bf = bf.to_s) if bf != 'raw' && bf != 'native' raise ArgumentError, "`binary_format` must be 'raw' or 'native' (got `#{bf}`)." end end if !conn raise ArgumentError, "No connection specified!\n" \ "Use `query.run(conn)` or `conn.repl(); query.run`." end {conn: conn, opts: opts, block: block, async_handler: async_handler} end