class Mongo::Server::Monitor::Connection

This class models the monitor connections and their behavior.

@since 2.0.0 @api private

Attributes

address[R]

@return [ Mongo::Address ] address The address to connect to.

options[R]

@return [ Hash ] options The passed in options.

server_connection_id[R]

@return [ Integer ] server_connection_id The server connection id.

Public Class Methods

new(address, options = {}) click to toggle source

Creates a new connection object to the specified target address with the specified options.

The constructor does not perform any I/O (and thus does not create sockets nor handshakes); call connect! method on the connection object to create the network connection.

@note Monitoring connections do not authenticate.

@param [ Mongo::Address ] address The address the connection is for. @param [ Hash ] options The connection options.

@option options [ Mongo::Server::Monitor::AppMetadata ] :app_metadata

Metadata to use for handshake. If missing or nil, handshake will
not be performed. Although a Mongo::Server::AppMetadata instance
will also work, monitoring connections are meant to use
Mongo::Server::Monitor::AppMetadata instances in order to omit
performing SCRAM negotiation with the server, as monitoring
sockets do not authenticate.

@option options [ Array<String> ] :compressors A list of potential

compressors to use, in order of preference. The driver chooses the
first compressor that is also supported by the server. Currently the
driver only supports 'zstd', 'snappy' and 'zlib'.

@option options [ Float ] :connect_timeout The timeout, in seconds,

to use for network operations. This timeout is used for all
socket operations rather than connect calls only, contrary to
what the name implies,

@since 2.0.0

# File lib/mongo/server/monitor/connection.rb, line 58
def initialize(address, options = {})
  @address = address
  @options = options.dup.freeze
  unless @app_metadata = options[:app_metadata]
    raise ArgumentError, 'App metadata is required'
  end
  @socket = nil
  @pid = Process.pid
  @compressor = nil
  @hello_ok = false
end

Public Instance Methods

check_document() click to toggle source

Build a document that should be used for connection check.

@return [BSON::Document] Document that should be sent to a server

for connection check.

@api private

# File lib/mongo/server/monitor/connection.rb, line 229
def check_document
  server_api = @app_metadata.server_api || options[:server_api]
  doc = if hello_ok? || server_api
    _doc = HELLO_DOC
    if server_api
      _doc = _doc.merge(Utils.transform_server_api(server_api))
    end
    _doc
  else
    LEGACY_HELLO_DOC
  end
  # compressors must be set to maintain correct compression status
  # in the server description. See RUBY-2427
  if compressors = options[:compressors]
    doc = doc.merge(compression: compressors)
  end
  doc
end
connect!() click to toggle source

Establishes a network connection to the target address.

If the connection is already established, this method does nothing.

@example Connect to the host.

connection.connect!

@note This method mutates the connection class by setting a socket if

one previously did not exist.

@return [ true ] If the connection succeeded.

@since 2.0.0

# File lib/mongo/server/monitor/connection.rb, line 158
def connect!
  if @socket
    raise ArgumentError, 'Monitoring connection already connected'
  end

  @socket = add_server_diagnostics do
    address.socket(socket_timeout, ssl_options.merge(
      connection_address: address, monitor: true))
  end
  true
end
disconnect!(options = nil) click to toggle source

Disconnect the connection.

@example Disconnect from the host.

connection.disconnect!

@note This method mutates the connection by setting the socket to nil

if the closing succeeded.

@note This method accepts an options argument for compatibility with

Server::Connections. However, all options are ignored.

@return [ true ] If the disconnect succeeded.

@since 2.0.0

# File lib/mongo/server/monitor/connection.rb, line 184
def disconnect!(options = nil)
  if socket
    socket.close rescue nil
    @socket = nil
  end
  true
end
dispatch(message) click to toggle source

Sends a message and returns the result.

@param [ Protocol::Message ] message The message to send.

@return [ Protocol::Message ] The result.

# File lib/mongo/server/monitor/connection.rb, line 97
def dispatch(message)
  dispatch_bytes(message.serialize.to_s)
end
dispatch_bytes(bytes, **opts) click to toggle source

Sends a preserialized message and returns the result.

@param [ String ] bytes The serialized message to send.

@option opts [ Numeric ] :read_socket_timeout The timeout to use for

each read operation.

@return [ Protocol::Message ] The result.

# File lib/mongo/server/monitor/connection.rb, line 109
def dispatch_bytes(bytes, **opts)
  write_bytes(bytes)
  read_response(
    socket_timeout: opts[:read_socket_timeout],
  )
end
handshake!() click to toggle source

Send handshake command to connected host and validate the response.

@return [BSON::Document] Handshake response from server

@raise [Mongo::Error] If handshake failed.

# File lib/mongo/server/monitor/connection.rb, line 197
def handshake!
  command = handshake_command(
    handshake_document(
      @app_metadata,
      server_api: options[:server_api]
    )
  )
  payload = command.serialize.to_s
  message = dispatch_bytes(payload)
  result = Operation::Result.new(message)
  result.validate!
  reply = result.documents.first
  set_compressor!(reply)
  set_hello_ok!(reply)
  @server_connection_id = reply['connectionId']
  reply
rescue => exc
  msg = "Failed to handshake with #{address}"
  Utils.warn_bg_exception(msg, exc,
    logger: options[:logger],
    log_prefix: options[:log_prefix],
    bg_error_backtrace: options[:bg_error_backtrace],
  )
  raise
end
read_response(**opts) click to toggle source

@option opts [ Numeric ] :socket_timeout The timeout to use for

each read operation.
# File lib/mongo/server/monitor/connection.rb, line 130
def read_response(**opts)
  unless connected?
    raise ArgumentError, "Trying to read on an unconnected connection #{self}"
  end

  add_server_connection_id do
    add_server_diagnostics do
      Protocol::Message.deserialize(socket,
        Protocol::Message::MAX_MESSAGE_SIZE,
        nil,
        **opts)
    end
  end
end
socket_timeout() click to toggle source

Returns the monitoring socket timeout.

Note that monitoring connections use the connect timeout value as the socket timeout value. See the Server Discovery and Monitoring specification for details.

@return [ Float ] The socket timeout in seconds.

@since 2.4.3

# File lib/mongo/server/monitor/connection.rb, line 85
def socket_timeout
  options[:connect_timeout] || Server::CONNECT_TIMEOUT
end
write_bytes(bytes) click to toggle source
# File lib/mongo/server/monitor/connection.rb, line 116
def write_bytes(bytes)
  unless connected?
    raise ArgumentError, "Trying to dispatch on an unconnected connection #{self}"
  end

  add_server_connection_id do
    add_server_diagnostics do
      socket.write(bytes)
    end
  end
end

Private Instance Methods

add_server_connection_id() { || ... } click to toggle source
# File lib/mongo/server/monitor/connection.rb, line 250
def add_server_connection_id
  yield
rescue Mongo::Error => e
  if server_connection_id
    note = "sconn:#{server_connection_id}"
    e.add_note(note)
  end
  raise e
end
hello_ok?() click to toggle source
# File lib/mongo/server/monitor/connection.rb, line 269
def hello_ok?
  @hello_ok
end
set_hello_ok!(reply) click to toggle source

Update @hello_ok flag according to server reply to legacy hello command. The flag will be set to true if connected server supports hello command, otherwise the flag will be set to false.

@param [ BSON::Document ] reply Server reply to legacy hello command.

# File lib/mongo/server/monitor/connection.rb, line 265
def set_hello_ok!(reply)
  @hello_ok = !!reply[:helloOk]
end