class RethinkDB::QueryHandle

Public Instance Methods

callback(res) click to toggle source
# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 48
def callback(res)
  begin
    if @handler.stopped?
      @closed = true
      @conn.stop(@token)
      return
    elsif res
      is_cfeed = (res['n'] & [Response::ResponseNote::SEQUENCE_FEED,
                              Response::ResponseNote::ATOM_FEED,
                              Response::ResponseNote::ORDER_BY_LIMIT_FEED,
                              Response::ResponseNote::UNIONED_FEED]) != []

      case res['t']
      when Response::ResponseType::SUCCESS_PARTIAL,
           Response::ResponseType::SUCCESS_SEQUENCE
        guarded_async_run do
          handle_open
          if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
            @conn.register_query(@token, @all_opts, self) if !@conn.closed?
            @conn.dispatch([Query::QueryType::CONTINUE], @token) if !@conn.closed?
          end
          Shim.response_to_native(res, @msg, @all_opts).each do |row|
            if is_cfeed
              if (row.has_key?('new_val') && row.has_key?('old_val') &&
                  @handler.respond_to?(:on_change))
                handle(:on_change, row['old_val'], row['new_val'])
              elsif (row.has_key?('new_val') && !row.has_key?('old_val') &&
                     @handler.respond_to?(:on_initial_val))
                handle(:on_initial_val, row['new_val'])
              elsif (row.has_key?('old_val') && !row.has_key?('new_val') &&
                     @handler.respond_to?(:on_uninitial_val))
                handle(:on_uninitial_val, row['old_val'])
              elsif row.has_key?('error') && @handler.respond_to?(:on_change_error)
                handle(:on_change_error, row['error'])
              elsif row.has_key?('state') && @handler.respond_to?(:on_state)
                handle(:on_state, row['state'])
              else
                handle(:on_unhandled_change, row)
              end
            else
              handle(:on_stream_val, row)
            end
          end
          if res['t'] == Response::ResponseType::SUCCESS_SEQUENCE ||
              @conn.closed?
            handle_close
          end
        end
      when Response::ResponseType::SUCCESS_ATOM
        guarded_async_run do
          return if @closed
          handle_open
          val = Shim.response_to_native(res, @msg, @all_opts)
          if val.is_a?(Array)
            handle(:on_array, val)
          else
            handle(:on_atom, val)
          end
          handle_close
        end
      when Response::ResponseType::WAIT_COMPLETE
        guarded_async_run do
          return if @closed
          handle_open
          handle(:on_wait_complete)
          handle_close
        end
      else
        exc = nil
        begin
          exc = Shim.response_to_native(res, @msg, @all_opts)
        rescue Exception => e
          exc = e
        end
        guarded_async_run do
          return if @closed
          handle_open
          handle(:on_error, e)
          handle_close
        end
      end
    else
      guarded_async_run {
        return if @closed
        handle_close
      }
    end
  rescue Exception => e
    guarded_async_run do
      return if @closed
      handle_open
      handle(:on_error, e)
      handle_close
    end
  end
end
guarded_async_run(&b) click to toggle source

Override this method with an async dispatch, making sure that when the block is run @closed == false

# File lib/nobrainer_streams/rethinkdb_monkeypatch.rb, line 44
def guarded_async_run(&b)
  raise "Must override QueryHandle#guarded_async_run"
end