class RethinkDB::QueryHandle

Public Class Methods

new(handler, msg, all_opts, token, conn) click to toggle source
# File lib/net.rb, line 137
def initialize(handler, msg, all_opts, token, conn)
  @handler = handler
  @msg = msg
  @all_opts = all_opts
  @token = token
  @conn = conn
  @opened = false
  @closed = false
end

Public Instance Methods

callback(res) click to toggle source
# File lib/net.rb, line 187
def callback(res)
  begin
    if @handler.stopped? || !EM.reactor_running?
      @closed = true
      @conn.stop(@token)
      return
    elsif res
      is_cfeed = (res['n'] & [Response::ResponseNote::SEQUENCE_FEED,
                              Response::ResponseNote::ATOM_FEED,
                              Response::ResponseNote::ORDER_BY_LIMIT_FEED,
                              Response::ResponseNote::UNIONED_FEED]) != []
      if (res['t'] == Response::ResponseType::SUCCESS_PARTIAL) ||
          (res['t'] == Response::ResponseType::SUCCESS_SEQUENCE)
        safe_next_tick {
          handle_open
          if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
            @conn.register_query(@token, @all_opts, self) if !@conn.closed?
            @conn.dispatch([Query::QueryType::CONTINUE], @token) if !@conn.closed?
          end
          Shim.response_to_native(res, @msg, @all_opts).each {|row|
            if is_cfeed
              if (row.has_key?('new_val') && row.has_key?('old_val') &&
                  @handler.respond_to?(:on_change))
                handle(:on_change, row['old_val'], row['new_val'])
              elsif (row.has_key?('new_val') && !row.has_key?('old_val') &&
                     @handler.respond_to?(:on_initial_val))
                handle(:on_initial_val, row['new_val'])
              elsif (row.has_key?('old_val') && !row.has_key?('new_val') &&
                     @handler.respond_to?(:on_uninitial_val))
                handle(:on_uninitial_val, row['old_val'])
              elsif row.has_key?('error') && @handler.respond_to?(:on_change_error)
                handle(:on_change_error, row['error'])
              elsif row.has_key?('state') && @handler.respond_to?(:on_state)
                handle(:on_state, row['state'])
              else
                handle(:on_unhandled_change, row)
              end
            else
              handle(:on_stream_val, row)
            end
          }
          if (res['t'] == Response::ResponseType::SUCCESS_SEQUENCE ||
              @conn.closed?)
            handle_close
          end
        }
      elsif res['t'] == Response::ResponseType::SUCCESS_ATOM
        safe_next_tick {
          return if @closed
          handle_open
          val = Shim.response_to_native(res, @msg, @all_opts)
          if val.is_a?(Array)
            handle(:on_array, val)
          else
            handle(:on_atom, val)
          end
          handle_close
        }
      elsif res['t'] == Response::ResponseType::WAIT_COMPLETE
        safe_next_tick {
          return if @closed
          handle_open
          handle(:on_wait_complete)
          handle_close
        }
      else
        exc = nil
        begin
          exc = Shim.response_to_native(res, @msg, @all_opts)
        rescue Exception => e
          exc = e
        end
        safe_next_tick {
          return if @closed
          handle_open
          handle(:on_error, e)
          handle_close
        }
      end
    else
      safe_next_tick {
        return if @closed
        handle_close
      }
    end
  rescue Exception => e
    safe_next_tick {
      return if @closed
      handle_open
      handle(:on_error, e)
      handle_close
    }
  end
end
close() click to toggle source
# File lib/net.rb, line 149
def close
  if !@closed
    handle_close
    return @conn.stop(@token)
  end
  return false
end
closed?() click to toggle source
# File lib/net.rb, line 146
def closed?
  @closed
end
handle(m, *args) click to toggle source
# File lib/net.rb, line 156
def handle(m, *args)
  begin
    @handler.handle(m, args, self)
  rescue Exception => e
    @handler.stop
    raise e
  end
end
handle_close() click to toggle source
# File lib/net.rb, line 170
def handle_close
  if !@closed
    @closed = true
    handle(:on_close)
  end
end
handle_force_close() click to toggle source
# File lib/net.rb, line 176
def handle_force_close
  if !@closed
    handle(:on_error, ReqlRuntimeError.new("Connection is closed."))
  end
  handle_close
end
handle_open() click to toggle source
# File lib/net.rb, line 164
def handle_open
  if !@opened
    @opened = true
    handle(:on_open)
  end
end
safe_next_tick(&b) click to toggle source
# File lib/net.rb, line 182
def safe_next_tick(&b)
  EM.next_tick {
    b.call if !@closed
  }
end