class Mongo::Protocol::Msg

MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.

OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.

@api private

@since 2.5.0

Constants

DATABASE_IDENTIFIER

The identifier for the database name to execute the command on.

@since 2.5.0

FLAGS

Available flags for a OP_MSG message.

INTERNAL_KEYS

Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging.

@api private

KNOWN_FLAGS
OP_CODE

The operation code required to specify a OP_MSG message. @return [ Fixnum ] the operation code.

@since 2.5.0

Public Class Methods

new(flags, options, main_document, *sequences) click to toggle source

Creates a new OP_MSG protocol message

@example Create a OP_MSG wire protocol message

Msg.new([:more_to_come], {}, { hello: 1 },
        { type: 1, payload: { identifier: 'documents', sequence: [..] } })

@param [ Array<Symbol> ] flags The flag bits. Currently supported

values are :more_to_come and :checksum_present.

@param [ Hash ] options The options. @param [ BSON::Document, Hash ] main_document The document that will

become the payload type 0 section. Can contain global args as they
are defined in the OP_MSG specification.

@param [ Protocol::Msg::Section1 ] sequences Zero or more payload type 1

sections.

@option options [ true, false ] validating_keys Whether keys should be

validated for being valid document keys (i.e. not begin with $ and
not contain dots).

@api private

@since 2.5.0

Calls superclass method
# File lib/mongo/protocol/msg.rb, line 65
def initialize(flags, options, main_document, *sequences)
  if flags
    flags.each do |flag|
      unless KNOWN_FLAGS.key?(flag)
        raise ArgumentError, "Unknown flag: #{flag.inspect}"
      end
    end
  end
  @flags = flags || []
  @options = options
  unless main_document.is_a?(Hash)
    raise ArgumentError, "Main document must be a Hash, given: #{main_document.class}"
  end
  @main_document = main_document
  sequences.each_with_index do |section, index|
    unless section.is_a?(Section1)
      raise ArgumentError, "All sequences must be Section1 instances, got: #{section} at index #{index}"
    end
  end
  @sequences = sequences
  @sections = [
    {type: 0, payload: @main_document}
  ] + @sequences.map do |section|
    {type: 1, payload: {
      identifier: section.identifier,
      sequence: section.documents,
    }}
  end
  @request_id = nil
  super
end

Public Instance Methods

bulk_write?() click to toggle source

Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.

@return [ Boolean ] Whether this message represents a bulk write.

@note This method was written to support client-side encryption

functionality. It is not recommended that this method be used in
service of any other feature or behavior.

@api private

# File lib/mongo/protocol/msg.rb, line 269
def bulk_write?
  inserts = @main_document['documents']
  updates = @main_document['updates']
  deletes = @main_document['deletes']

  num_inserts = inserts && inserts.length || 0
  num_updates = updates && updates.length || 0
  num_deletes = deletes && deletes.length || 0

  num_inserts > 1  || num_updates > 1 || num_deletes > 1
end
documents() click to toggle source
# File lib/mongo/protocol/msg.rb, line 193
def documents
  [@main_document]
end
fix_after_deserialization() click to toggle source

Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.

TODO fix deserialization so that this method is not needed.

@api private

# File lib/mongo/protocol/msg.rb, line 181
def fix_after_deserialization
  if @sections.nil?
    raise NotImplementedError, "After deserializations @sections should have been initialized"
  end
  if @sections.length != 1
    raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections"
  end
  @main_document = @sections.first
  @sequences = []
  @sections = [{type: 0, payload: @main_document}]
end
maybe_add_server_api(server_api) click to toggle source
# File lib/mongo/protocol/msg.rb, line 281
def maybe_add_server_api(server_api)
  conflicts = {}
  %i(apiVersion apiStrict apiDeprecationErrors).each do |key|
    if @main_document.key?(key)
      conflicts[key] = @main_document[key]
    end
    if @main_document.key?(key.to_s)
      conflicts[key] = @main_document[key.to_s]
    end
  end
  unless conflicts.empty?
    raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}"
  end

  main_document = @main_document.merge(
    Utils.transform_server_api(server_api)
  )
  Msg.new(@flags, @options, main_document, *@sequences)
end
maybe_compress(compressor, zlib_compression_level = nil) click to toggle source

Compress the message, if the command being sent permits compression. Otherwise returns self.

@param [ String, Symbol ] compressor The compressor to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.

@return [ Message ] A Protocol::Compressed message or self,

depending on whether this message can be compressed.

@since 2.5.0 @api private

# File lib/mongo/protocol/msg.rb, line 171
def maybe_compress(compressor, zlib_compression_level = nil)
  compress_if_possible(command.keys.first, compressor, zlib_compression_level)
end
maybe_decrypt(context) click to toggle source

Possibly decrypt this message with libmongocrypt. Message will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

@param [ Mongo::Operation::Context ] context The operation context.

@return [ Mongo::Protocol::Msg ] The decrypted message, or the original

message if decryption was not possible or necessary.
# File lib/mongo/protocol/msg.rb, line 248
def maybe_decrypt(context)
  if context.decrypt?
    cmd = merge_sections
    enc_cmd = context.encrypter.decrypt(cmd)
    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end
maybe_encrypt(connection, context) click to toggle source

Possibly encrypt this message with libmongocrypt. Message will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

@param [ Mongo::Server::Connection ] connection The connection on which

the operation is performed.

@param [ Mongo::Operation::Context ] context The operation context.

@return [ Mongo::Protocol::Msg ] The encrypted message, or the original

message if encryption was not possible or necessary.
# File lib/mongo/protocol/msg.rb, line 211
def maybe_encrypt(connection, context)
  # TODO verify compression happens later, i.e. when this method runs
  # the message is not compressed.
  if context.encrypt?
    if connection.description.max_wire_version < 8
      raise Error::CryptError.new(
        "Cannot perform encryption against a MongoDB server older than " +
        "4.2 (wire version less than 8). Currently connected to server " +
        "with max wire version #{connection.description.max_wire_version}} " +
        "(Auto-encryption requires a minimum MongoDB version of 4.2)"
      )
    end

    db_name = @main_document[DATABASE_IDENTIFIER]
    cmd = merge_sections
    enc_cmd = context.encrypter.encrypt(db_name, cmd)
    if cmd.key?('$db') && !enc_cmd.key?('$db')
      enc_cmd['$db'] = cmd['$db']
    end

    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end
payload() click to toggle source

Return the event payload for monitoring.

@example Return the event payload.

message.payload

@return [ BSON::Document ] The event payload.

@since 2.5.0

# File lib/mongo/protocol/msg.rb, line 117
def payload
  # Reorder keys in main_document for better logging - see
  # https://jira.mongodb.org/browse/RUBY-1591.
  # Note that even without the reordering, the payload is not an exact
  # match to what is sent over the wire because the command as used in
  # the published event combines keys from multiple sections of the
  # payload sent over the wire.
  ordered_command = {}
  skipped_command = {}
  command.each do |k, v|
    if INTERNAL_KEYS.member?(k.to_s)
      skipped_command[k] = v
    else
      ordered_command[k] = v
    end
  end
  ordered_command.update(skipped_command)

  BSON::Document.new(
    command_name: ordered_command.keys.first.to_s,
    database_name: @main_document[DATABASE_IDENTIFIER],
    command: ordered_command,
    request_id: request_id,
    reply: @main_document,
  )
end
replyable?() click to toggle source

Whether the message expects a reply from the database.

@example Does the message require a reply?

message.replyable?

@return [ true, false ] If the message expects a reply.

@since 2.5.0

# File lib/mongo/protocol/msg.rb, line 105
def replyable?
  @replyable ||= !flags.include?(:more_to_come)
end
serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) click to toggle source

Serializes message into bytes that can be sent on the wire.

@param [ BSON::ByteBuffer ] buffer where the message should be inserted. @param [ Integer ] max_bson_size The maximum bson object size.

@return [ BSON::ByteBuffer ] buffer containing the serialized message.

@since 2.5.0

Calls superclass method Mongo::Protocol::Message#serialize
# File lib/mongo/protocol/msg.rb, line 152
def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
  validate_document_size!(max_bson_size)

  super
  add_check_sum(buffer)
  buffer
end

Private Instance Methods

add_check_sum(buffer) click to toggle source
# File lib/mongo/protocol/msg.rb, line 333
def add_check_sum(buffer)
  if flags.include?(:checksum_present)
    #buffer.put_int32(checksum)
  end
end
command() click to toggle source
# File lib/mongo/protocol/msg.rb, line 320
def command
  @command ||= if @main_document
    @main_document.dup.tap do |cmd|
      @sequences.each do |section|
        cmd[section.identifier] ||= []
        cmd[section.identifier] += section.documents
      end
    end
  else
    documents.first
  end
end
validate_document_size!(max_bson_size) click to toggle source

Validate that the documents in this message are all smaller than the maxBsonObjectSize. If not, raise an exception.

# File lib/mongo/protocol/msg.rb, line 305
def validate_document_size!(max_bson_size)
  max_bson_size ||= Mongo::Server::ConnectionBase::DEFAULT_MAX_BSON_OBJECT_SIZE

  contains_too_large_document = @sections.any? do |section|
    section[:type] == 1 &&
      section[:payload][:sequence].any? do |document|
        document.to_bson.length > max_bson_size
      end
  end

  if contains_too_large_document
    raise Error::MaxBSONSize.new('The document exceeds maximum allowed BSON object size after serialization')
  end
end