class RethinkDB::QueryHandle
Public Instance Methods
callback(res)
click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 48 def callback(res) begin if @handler.stopped? @closed = true @conn.stop(@token) return elsif res is_cfeed = (res['n'] & [Response::ResponseNote::SEQUENCE_FEED, Response::ResponseNote::ATOM_FEED, Response::ResponseNote::ORDER_BY_LIMIT_FEED, Response::ResponseNote::UNIONED_FEED]) != [] case res['t'] when Response::ResponseType::SUCCESS_PARTIAL, Response::ResponseType::SUCCESS_SEQUENCE guarded_async_run do handle_open if res['t'] == Response::ResponseType::SUCCESS_PARTIAL @conn.register_query(@token, @all_opts, self) if !@conn.closed? @conn.dispatch([Query::QueryType::CONTINUE], @token) if !@conn.closed? end Shim.response_to_native(res, @msg, @all_opts).each do |row| if is_cfeed if (row.has_key?('new_val') && row.has_key?('old_val') && @handler.respond_to?(:on_change)) handle(:on_change, row['old_val'], row['new_val']) elsif (row.has_key?('new_val') && !row.has_key?('old_val') && @handler.respond_to?(:on_initial_val)) handle(:on_initial_val, row['new_val']) elsif (row.has_key?('old_val') && !row.has_key?('new_val') && @handler.respond_to?(:on_uninitial_val)) handle(:on_uninitial_val, row['old_val']) elsif row.has_key?('error') && @handler.respond_to?(:on_change_error) handle(:on_change_error, row['error']) elsif row.has_key?('state') && @handler.respond_to?(:on_state) handle(:on_state, row['state']) else handle(:on_unhandled_change, row) end else handle(:on_stream_val, row) end end if res['t'] == Response::ResponseType::SUCCESS_SEQUENCE || @conn.closed? handle_close end end when Response::ResponseType::SUCCESS_ATOM guarded_async_run do return if @closed handle_open val = Shim.response_to_native(res, @msg, @all_opts) if val.is_a?(Array) handle(:on_array, val) else handle(:on_atom, val) end handle_close end when Response::ResponseType::WAIT_COMPLETE guarded_async_run do return if @closed handle_open handle(:on_wait_complete) handle_close end else exc = nil begin exc = Shim.response_to_native(res, @msg, @all_opts) rescue Exception => e exc = e end guarded_async_run do return if @closed handle_open handle(:on_error, e) handle_close end end else guarded_async_run { return if @closed handle_close } end rescue Exception => e guarded_async_run do return if @closed handle_open handle(:on_error, e) handle_close end end end
guarded_async_run(&b)
click to toggle source
Override this method with an async dispatch, making sure that when the block is run @closed == false
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 44 def guarded_async_run(&b) raise "Must override QueryHandle#guarded_async_run" end