class Mongo::Server::PushMonitor
A monitor utilizing server-pushed hello requests.
When a Monitor
handshakes with a 4.4+ server, it creates an instance of PushMonitor
. PushMonitor
subsequently executes server-pushed hello (i.e. awaited & exhausted hello) to receive topology changes from the server as quickly as possible. The Monitor
still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.
@api private
Attributes
monitor[R]
@return [ Monitor
] The monitor to which this push monitor is attached.
monitoring[R]
@return [ Monitoring
] monitoring The monitoring.
options[R]
@return [ Hash ] Push monitor options.
topology_version[R]
@return [ TopologyVersion
] Most recently received topology version.
Public Class Methods
new(monitor, topology_version, monitoring, **options)
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 35 def initialize(monitor, topology_version, monitoring, **options) if topology_version.nil? raise ArgumentError, 'Topology version must be provided but it was nil' end unless options[:app_metadata] raise ArgumentError, 'App metadata is required' end unless options[:check_document] raise ArgumentError, 'Check document is required' end @app_metadata = options[:app_metadata] @check_document = options[:check_document] @monitor = monitor @topology_version = topology_version @monitoring = monitoring @options = options @lock = Mutex.new end
Public Instance Methods
check()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 137 def check @lock.synchronize do if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") @connection.disconnect! @connection = nil end end @lock.synchronize do unless @connection @server_pushing = false connection = PushMonitor::Connection.new(server.address, options) connection.connect! @connection = connection end end resp_msg = begin unless @server_pushing write_check_command end read_response rescue Mongo::Error @lock.synchronize do @connection.disconnect! @connection = nil end raise end @server_pushing = resp_msg.flags.include?(:more_to_come) result = Operation::Result.new(resp_msg) result.validate! result.documents.first end
do_work()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 94 def do_work @lock.synchronize do return if @stop_requested end result = monitoring.publish_heartbeat(server, awaited: true) do check end new_description = monitor.run_sdam_flow(result, awaited: true) # When hello fails due to a fail point, the response does not # include topology version. In this case we need to keep our existing # topology version so that we can resume monitoring. # The spec does not appear to directly address this case but # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster # says that topologyVersion should only be updated from successful # hello responses. if new_description.topology_version @topology_version = new_description.topology_version end rescue IOError, SocketError, SystemCallError, Mongo::Error => exc stop_requested = @lock.synchronize { @stop_requested } if stop_requested # Ignore the exception, see RUBY-2771. return end msg = "Error running awaited hello on #{server.address}" Utils.warn_bg_exception(msg, exc, logger: options[:logger], log_prefix: options[:log_prefix], bg_error_backtrace: options[:bg_error_backtrace], ) # If a request failed on a connection, stop push monitoring. # In case the server is dead we don't want to have two connections # trying to connect unsuccessfully at the same time. stop! # Request an immediate check on the monitor to get reinstated as # soon as possible in case the server is actually alive. server.scan_semaphore.signal end
read_response()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 184 def read_response if timeout = options[:connect_timeout] if timeout < 0 raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}" elsif timeout > 0 timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL end end # We set the timeout twice: once passed into read_socket which applies # to each individual read operation, and again around the entire read. Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited hello response in #{timeout} seconds") do @lock.synchronize { @connection }.read_response(socket_timeout: timeout) end end
start!()
click to toggle source
Calls superclass method
Mongo::BackgroundThread#start!
# File lib/mongo/server/push_monitor.rb, line 69 def start! @lock.synchronize do super end end
stop!()
click to toggle source
Calls superclass method
Mongo::BackgroundThread#stop!
# File lib/mongo/server/push_monitor.rb, line 75 def stop! @lock.synchronize do @stop_requested = true if @connection # Interrupt any in-progress exhausted hello reads by # disconnecting the connection. @connection.send(:socket).close rescue nil end end super.tap do @lock.synchronize do if @connection @connection.disconnect! @connection = nil end end end end
to_s()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 199 def to_s "#<#{self.class.name}:#{object_id} #{server.address}>" end
write_check_command()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 173 def write_check_command document = @check_document.merge( topologyVersion: topology_version.to_doc, maxAwaitTimeMS: monitor.heartbeat_interval * 1000, ) command = Protocol::Msg.new( [:exhaust_allowed], {}, document.merge({'$db' => Database::ADMIN}) ) @lock.synchronize { @connection }.write_bytes(command.serialize.to_s) end