class Mongo::Cluster::CursorReaper
A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.
@api private
@since 2.3.0
Constants
- FREQUENCY
The default time interval for the cursor reaper to send pending kill cursors operations.
@since 2.3.0
Attributes
Public Class Methods
Create a cursor reaper.
@param [ Cluster
] cluster The cluster.
@api private
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 42 def initialize(cluster) @cluster = cluster @to_kill = {} @active_cursor_ids = Set.new @mutex = Mutex.new @kill_spec_queue = Queue.new end
Public Instance Methods
Execute all pending kill cursors operations.
@example Execute pending kill cursors operations.
cursor_reaper.kill_cursors
@api private
@since 2.3.0
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 133 def kill_cursors # TODO optimize this to batch kill cursor operations for the same # server/database/collection instead of killing each cursor # individually. loop do server_address = nil kill_spec = @mutex.synchronize do read_scheduled_kill_specs # Find a server that has any cursors scheduled for destruction. server_address, specs = @to_kill.detect { |_, specs| specs.any? } if specs.nil? # All servers have empty specs, nothing to do. return end # Note that this mutates the spec in the queue. # If the kill cursor operation fails, we don't attempt to # kill that cursor again. spec = specs.take(1).tap do |arr| specs.subtract(arr) end.first unless @active_cursor_ids.include?(spec.cursor_id) # The cursor was already killed, typically because it has # been iterated to completion. Remove the kill spec from # our records without doing any more work. spec = nil end spec end # If there was a spec to kill but its cursor was already killed, # look for another spec. next unless kill_spec # We could also pass kill_spec directly into the KillCursors # operation, though this would make that operation have a # different API from all of the other ones which accept hashes. spec = { cursor_ids: [kill_spec.cursor_id], coll_name: kill_spec.coll_name, db_name: kill_spec.db_name, } op = Operation::KillCursors.new(spec) server = cluster.servers.detect do |server| server.address == server_address end unless server # TODO We currently don't have a server for the address that the # cursor is associated with. We should leave the cursor in the # queue to be killed at a later time (when the server comes back). next end options = { server_api: server.options[:server_api], connection_global_id: kill_spec.connection_global_id, } op.execute(server, context: Operation::Context.new(options: options)) if session = kill_spec.session if session.implicit? session.end_session end end end end
Read and decode scheduled kill cursors operations.
This method mutates instance variables without locking, so is is not thread safe. Generally, it should not be called itself, this is a helper for `kill_cursor` method.
@api private
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 114 def read_scheduled_kill_specs while kill_spec = @kill_spec_queue.pop(true) if @active_cursor_ids.include?(kill_spec.cursor_id) @to_kill[kill_spec.server_address] ||= Set.new @to_kill[kill_spec.server_address] << kill_spec end end rescue ThreadError # Empty queue, nothing to do. end
Register a cursor id as active.
@example Register a cursor as active.
cursor_reaper.register_cursor(id)
@param [ Integer ] id The id of the cursor to register as active.
@api private
@since 2.3.0
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 71 def register_cursor(id) if id.nil? raise ArgumentError, 'register_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'register_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids << id end end
Schedule a kill cursors operation to be eventually executed.
@param [ Cursor::KillSpec
] kill_spec The kill specification.
@api private
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 57 def schedule_kill_cursor(kill_spec) @kill_spec_queue << kill_spec end
Unregister a cursor id, indicating that it's no longer active.
@example Unregister a cursor.
cursor_reaper.unregister_cursor(id)
@param [ Integer ] id The id of the cursor to unregister.
@api private
@since 2.3.0
# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 94 def unregister_cursor(id) if id.nil? raise ArgumentError, 'unregister_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'unregister_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids.delete(id) end end