class RethinkDB::QueryHandle
Public Class Methods
new(handler, msg, all_opts, token, conn)
click to toggle source
# File lib/net.rb, line 137 def initialize(handler, msg, all_opts, token, conn) @handler = handler @msg = msg @all_opts = all_opts @token = token @conn = conn @opened = false @closed = false end
Public Instance Methods
callback(res)
click to toggle source
# File lib/net.rb, line 187 def callback(res) begin if @handler.stopped? || !EM.reactor_running? @closed = true @conn.stop(@token) return elsif res is_cfeed = (res['n'] & [Response::ResponseNote::SEQUENCE_FEED, Response::ResponseNote::ATOM_FEED, Response::ResponseNote::ORDER_BY_LIMIT_FEED, Response::ResponseNote::UNIONED_FEED]) != [] if (res['t'] == Response::ResponseType::SUCCESS_PARTIAL) || (res['t'] == Response::ResponseType::SUCCESS_SEQUENCE) safe_next_tick { handle_open if res['t'] == Response::ResponseType::SUCCESS_PARTIAL @conn.register_query(@token, @all_opts, self) if !@conn.closed? @conn.dispatch([Query::QueryType::CONTINUE], @token) if !@conn.closed? end Shim.response_to_native(res, @msg, @all_opts).each {|row| if is_cfeed if (row.has_key?('new_val') && row.has_key?('old_val') && @handler.respond_to?(:on_change)) handle(:on_change, row['old_val'], row['new_val']) elsif (row.has_key?('new_val') && !row.has_key?('old_val') && @handler.respond_to?(:on_initial_val)) handle(:on_initial_val, row['new_val']) elsif (row.has_key?('old_val') && !row.has_key?('new_val') && @handler.respond_to?(:on_uninitial_val)) handle(:on_uninitial_val, row['old_val']) elsif row.has_key?('error') && @handler.respond_to?(:on_change_error) handle(:on_change_error, row['error']) elsif row.has_key?('state') && @handler.respond_to?(:on_state) handle(:on_state, row['state']) else handle(:on_unhandled_change, row) end else handle(:on_stream_val, row) end } if (res['t'] == Response::ResponseType::SUCCESS_SEQUENCE || @conn.closed?) handle_close end } elsif res['t'] == Response::ResponseType::SUCCESS_ATOM safe_next_tick { return if @closed handle_open val = Shim.response_to_native(res, @msg, @all_opts) if val.is_a?(Array) handle(:on_array, val) else handle(:on_atom, val) end handle_close } elsif res['t'] == Response::ResponseType::WAIT_COMPLETE safe_next_tick { return if @closed handle_open handle(:on_wait_complete) handle_close } else exc = nil begin exc = Shim.response_to_native(res, @msg, @all_opts) rescue Exception => e exc = e end safe_next_tick { return if @closed handle_open handle(:on_error, e) handle_close } end else safe_next_tick { return if @closed handle_close } end rescue Exception => e safe_next_tick { return if @closed handle_open handle(:on_error, e) handle_close } end end
close()
click to toggle source
# File lib/net.rb, line 149 def close if !@closed handle_close return @conn.stop(@token) end return false end
closed?()
click to toggle source
# File lib/net.rb, line 146 def closed? @closed end
handle(m, *args)
click to toggle source
# File lib/net.rb, line 156 def handle(m, *args) begin @handler.handle(m, args, self) rescue Exception => e @handler.stop raise e end end
handle_close()
click to toggle source
# File lib/net.rb, line 170 def handle_close if !@closed @closed = true handle(:on_close) end end
handle_force_close()
click to toggle source
# File lib/net.rb, line 176 def handle_force_close if !@closed handle(:on_error, ReqlRuntimeError.new("Connection is closed.")) end handle_close end
handle_open()
click to toggle source
# File lib/net.rb, line 164 def handle_open if !@opened @opened = true handle(:on_open) end end
safe_next_tick(&b)
click to toggle source
# File lib/net.rb, line 182 def safe_next_tick(&b) EM.next_tick { b.call if !@closed } end