class Mongo::Cluster::CursorReaper

A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.

@api private

@since 2.3.0

Constants

FREQUENCY

The default time interval for the cursor reaper to send pending kill cursors operations.

@since 2.3.0

Attributes

cluster[R]

Public Class Methods

new(cluster) click to toggle source

Create a cursor reaper.

@param [ Cluster ] cluster The cluster.

@api private

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 41
def initialize(cluster)
  @cluster = cluster
  @to_kill = {}
  @active_cursor_ids = Set.new
  @mutex = Mutex.new
  @kill_spec_queue = Queue.new
end

Public Instance Methods

execute()
Alias for: kill_cursors
flush()
Alias for: kill_cursors
kill_cursors() click to toggle source

Execute all pending kill cursors operations.

@example Execute pending kill cursors operations.

cursor_reaper.kill_cursors

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 132
def kill_cursors
  # TODO optimize this to batch kill cursor operations for the same
  # server/database/collection instead of killing each cursor
  # individually.
  loop do
    server_address = nil

    kill_spec = @mutex.synchronize do
      read_scheduled_kill_specs
      # Find a server that has any cursors scheduled for destruction.
      server_address, specs =
        @to_kill.detect { |_, specs| specs.any? }

      if specs.nil?
        # All servers have empty specs, nothing to do.
        return
      end

      # Note that this mutates the spec in the queue.
      # If the kill cursor operation fails, we don't attempt to
      # kill that cursor again.
      spec = specs.take(1).tap do |arr|
        specs.subtract(arr)
      end.first

      unless @active_cursor_ids.include?(spec.cursor_id)
        # The cursor was already killed, typically because it has
        # been iterated to completion. Remove the kill spec from
        # our records without doing any more work.
        spec = nil
      end

      spec
    end

    # If there was a spec to kill but its cursor was already killed,
    # look for another spec.
    next unless kill_spec

    # We could also pass kill_spec directly into the KillCursors
    # operation, though this would make that operation have a
    # different API from all of the other ones which accept hashes.
    spec = {
      cursor_ids: [kill_spec.cursor_id],
      coll_name: kill_spec.coll_name,
      db_name: kill_spec.db_name,
    }
    op = Operation::KillCursors.new(spec)

    server = cluster.servers.detect do |server|
      server.address == server_address
    end

    unless server
      # TODO We currently don't have a server for the address that the
      # cursor is associated with. We should leave the cursor in the
      # queue to be killed at a later time (when the server comes back).
      next
    end

    options = {
      server_api: server.options[:server_api],
      connection_global_id: kill_spec.connection_global_id,
    }
    op.execute(server, context: Operation::Context.new(options: options))

    if session = kill_spec.session
      if session.implicit?
        session.end_session
      end
    end
  end
end
Also aliased as: execute, flush
read_scheduled_kill_specs() click to toggle source

Read and decode scheduled kill cursors operations.

This method mutates instance variables without locking, so is is not thread safe. Generally, it should not be called itself, this is a helper for `kill_cursor` method.

@api private

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 113
def read_scheduled_kill_specs
  while kill_spec = @kill_spec_queue.pop(true)
    if @active_cursor_ids.include?(kill_spec.cursor_id)
      @to_kill[kill_spec.server_address] ||= Set.new
      @to_kill[kill_spec.server_address] << kill_spec
    end
  end
rescue ThreadError
  # Empty queue, nothing to do.
end
register_cursor(id) click to toggle source

Register a cursor id as active.

@example Register a cursor as active.

cursor_reaper.register_cursor(id)

@param [ Integer ] id The id of the cursor to register as active.

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 70
def register_cursor(id)
  if id.nil?
    raise ArgumentError, 'register_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'register_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids << id
  end
end
schedule_kill_cursor(kill_spec) click to toggle source

Schedule a kill cursors operation to be eventually executed.

@param [ Cursor::KillSpec ] kill_spec The kill specification.

@api private

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 56
def schedule_kill_cursor(kill_spec)
  @kill_spec_queue << kill_spec
end
unregister_cursor(id) click to toggle source

Unregister a cursor id, indicating that it's no longer active.

@example Unregister a cursor.

cursor_reaper.unregister_cursor(id)

@param [ Integer ] id The id of the cursor to unregister.

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 93
def unregister_cursor(id)
  if id.nil?
    raise ArgumentError, 'unregister_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'unregister_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids.delete(id)
  end
end