class Mongo::Protocol::Message
A base class providing functionality required by all messages in the MongoDB wire protocol. It provides a minimal DSL for defining typed fields to enable serialization and deserialization over the wire.
@example
class WireProtocolMessage < Message private def op_code 1234 end FLAGS = [:first_bit, :bit_two] # payload field :flags, BitVector.new(FLAGS) field :namespace, CString field :document, Document field :documents, Document, true end
@abstract @api semiprivate
Constants
- BATCH_SIZE
The batch size constant.
@since 2.2.0
- COLLECTION
The collection constant.
@since 2.2.0
- LIMIT
The limit constant.
@since 2.2.0
- MAX_MESSAGE_SIZE
Default max message size of 48MB.
@since 2.2.1
- ORDERED
The ordered constant.
@since 2.2.0
- Q
The q constant.
@since 2.2.0
Attributes
Returns the request id for the message
@return [Fixnum] The request id for this message
Public Class Methods
Deserializes messages from an IO stream.
This method returns decompressed messages (i.e. if the message on the wire was OP_COMPRESSED, this method would typically return the OP_MSG message that is the result of decompression).
@param [ Integer ] max_message_size The max message size. @param [ IO ] io Stream containing a message @param [ Hash ] options
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
this message using BSON types instead of native Ruby types wherever possible.
@option options [ Numeric ] :socket_timeout The timeout to use for
each read operation.
@return [ Message
] Instance of a Message
class
@api private
# File lib/mongo/protocol/message.rb, line 238 def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil, options = {} ) # io is usually a Mongo::Socket instance, which supports the # timeout option. For compatibility with whoever might call this # method with some other IO-like object, pass options only when they # are not empty. read_options = {} if timeout = options[:socket_timeout] read_options[:timeout] = timeout end if read_options.empty? chunk = io.read(16) else chunk = io.read(16, **read_options) end buf = BSON::ByteBuffer.new(chunk) length, _request_id, response_to, _op_code = deserialize_header(buf) # Protection from potential DOS man-in-the-middle attacks. See # DRIVERS-276. if length > (max_message_size || MAX_MESSAGE_SIZE) raise Error::MaxMessageSize.new(max_message_size) end # Protection against returning the response to a previous request. See # RUBY-1117 if expected_response_to && response_to != expected_response_to raise Error::UnexpectedResponse.new(expected_response_to, response_to) end if read_options.empty? chunk = io.read(length - 16) else chunk = io.read(length - 16, **read_options) end buf = BSON::ByteBuffer.new(chunk) message = Registry.get(_op_code).allocate message.send(:fields).each do |field| if field[:multi] deserialize_array(message, buf, field, options) else deserialize_field(message, buf, field, options) end end if message.is_a?(Msg) message.fix_after_deserialization end message.maybe_inflate end
Private Class Methods
Deserializes an array of fields in a message
The number of items in the array must be described by a previously deserialized field specified in the class by the field dsl under the key :multi
@param message [Message] Message
to contain the deserialized array. @param io [IO] Stream containing the array to deserialize. @param field [Hash] Hash representing a field. @param options [ Hash ]
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
each of the elements in this array using BSON types wherever possible.
@return [Message] Message
with deserialized array.
# File lib/mongo/protocol/message.rb, line 435 def self.deserialize_array(message, io, field, options = {}) elements = [] count = message.instance_variable_get(field[:multi]) count.times { elements << field[:type].deserialize(io, options) } message.instance_variable_set(field[:name], elements) end
Deserializes a single field in a message
@param message [Message] Message
to contain the deserialized field. @param io [IO] Stream containing the field to deserialize. @param field [Hash] Hash representing a field. @param options [ Hash ]
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
this field using BSON types wherever possible.
@return [Message] Message
with deserialized field.
# File lib/mongo/protocol/message.rb, line 453 def self.deserialize_field(message, io, field, options = {}) message.instance_variable_set( field[:name], field[:type].deserialize(io, options) ) end
Deserializes the header of the message
@param io [IO] Stream containing the header. @return [Array<Fixnum>] Deserialized header.
# File lib/mongo/protocol/message.rb, line 391 def self.deserialize_header(io) Header.deserialize(io) end
A method for declaring a message field
@param name [String] Name of the field @param type [Module] Type specific serialization strategies @param multi [true, false, Symbol] Specify as true
to
serialize the field's value as an array of type +:type+ or as a symbol describing the field having the number of items in the array (used upon deserialization) Note: In fields where multi is a symbol representing the field containing number items in the repetition, the field containing that information *must* be deserialized prior to deserializing fields that use the number.
@return [NilClass]
# File lib/mongo/protocol/message.rb, line 410 def self.field(name, type, multi = false) fields << { :name => "@#{name}".intern, :type => type, :multi => multi } attr_reader name end
A class method for getting the fields for a message class
@return [Integer] the fields for the message class
# File lib/mongo/protocol/message.rb, line 343 def self.fields @fields ||= [] end
Public Instance Methods
Tests for equality between two wire protocol messages by comparing class and field values.
@param other [Mongo::Protocol::Message] The wire protocol message. @return [true, false] The equality of the messages.
# File lib/mongo/protocol/message.rb, line 298 def ==(other) return false if self.class != other.class fields.all? do |field| name = field[:name] instance_variable_get(name) == other.instance_variable_get(name) end end
Creates a hash from the values of the fields of a message.
@return [ Fixnum ] The hash code for the message.
# File lib/mongo/protocol/message.rb, line 311 def hash fields.map { |field| instance_variable_get(field[:name]) }.hash end
# File lib/mongo/protocol/message.rb, line 173 def maybe_add_server_api(server_api) raise Error::ServerApiNotSupported, "Server API parameters cannot be sent to pre-3.6 MongoDB servers. Please remove the :server_api parameter from Client options or use MongoDB 3.6 or newer" end
Compress the message, if supported by the wire protocol used and if the command being sent permits compression. Otherwise returns self.
@param [ String, Symbol
] compressor The compressor to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.
@return [ self ] Always returns self. Other message types should
override this method.
@since 2.5.0 @api private
# File lib/mongo/protocol/message.rb, line 112 def maybe_compress(compressor, zlib_compression_level = nil) self end
Possibly decrypt this message with libmongocrypt.
@param [ Mongo::Operation::Context
] context The operation context.
@return [ Mongo::Protocol::Msg
] The decrypted message, or the original
message if decryption was not possible or necessary.
# File lib/mongo/protocol/message.rb, line 152 def maybe_decrypt(context) # TODO determine if we should be decrypting data coming from pre-4.2 # servers, potentially using legacy wire protocols. If so we need # to implement decryption for those wire protocols as our current # encryption/decryption code is OP_MSG-specific. self end
Possibly encrypt this message with libmongocrypt.
@param [ Mongo::Server::Connection
] connection The connection on which
the operation is performed.
@param [ Mongo::Operation::Context
] context The operation context.
@return [ Mongo::Protocol::Msg
] The encrypted message, or the original
message if encryption was not possible or necessary.
# File lib/mongo/protocol/message.rb, line 168 def maybe_encrypt(connection, context) # Do nothing if the Message subclass has not implemented this method self end
Inflate a message if it is compressed.
@return [ Protocol::Message
] Always returns self. Subclasses should
override this method as necessary.
@since 2.5.0 @api private
# File lib/mongo/protocol/message.rb, line 142 def maybe_inflate self end
Default number returned value for protocol messages.
@return [ 0 ] This method must be overridden, otherwise, always returns 0.
@since 2.5.0
# File lib/mongo/protocol/message.rb, line 329 def number_returned; 0; end
The default for messages is not to require a reply after sending a message to the server.
@example Does the message require a reply?
message.replyable?
@return [ false ] The default is to not require a reply.
@since 2.0.0
# File lib/mongo/protocol/message.rb, line 97 def replyable? false end
Serializes message into bytes that can be sent on the wire
@param buffer [String] buffer where the message should be inserted @return [String] buffer containing the serialized message
# File lib/mongo/protocol/message.rb, line 201 def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) max_size = if max_bson_size && bson_overhead max_bson_size + bson_overhead elsif max_bson_size max_bson_size else nil end start = buffer.length serialize_header(buffer) serialize_fields(buffer, max_size) buffer.replace_int32(start, buffer.length - start) end
Generates a request id for a message
@return [Fixnum] a request id used for sending a message to the
server. The server will put this id in the response_to field of a reply.
# File lib/mongo/protocol/message.rb, line 320 def set_request_id @request_id = self.class.next_id end
Private Instance Methods
Compress the message, if the command being sent permits compression. Otherwise returns self.
@param [ String ] command_name Command name extracted from the message. @param [ String | Symbol
] compressor The compressor to use. @param [ Integer ] zlib_compression_level Zlib compression level to use.
@return [ Message
] A Protocol::Compressed
message or self,
depending on whether this message can be compressed.
@since 2.5.0
# File lib/mongo/protocol/message.rb, line 127 def compress_if_possible(command_name, compressor, zlib_compression_level) if compressor && compression_allowed?(command_name) Compressed.new(self, compressor, zlib_compression_level) else self end end
A method for getting the fields for a message class
@return [Integer] the fields for the message class
# File lib/mongo/protocol/message.rb, line 336 def fields self.class.fields end
# File lib/mongo/protocol/message.rb, line 177 def merge_sections cmd = if @sections.length > 1 cmd = @sections.detect { |section| section[:type] == 0 }[:payload] identifier = @sections.detect { |section| section[:type] == 1}[:payload][:identifier] cmd.merge(identifier.to_sym => @sections.select { |section| section[:type] == 1 }. map { |section| section[:payload][:sequence] }. inject([]) { |arr, documents| arr + documents } ) elsif @sections.first[:payload] @sections.first[:payload] else @sections.first end if cmd.nil? raise "The command should never be nil here" end cmd end
Serializes message fields into a buffer
@param buffer [String] buffer to receive the field @return [String] buffer with serialized field
# File lib/mongo/protocol/message.rb, line 351 def serialize_fields(buffer, max_bson_size = nil) fields.each do |field| value = instance_variable_get(field[:name]) if field[:multi] value.each do |item| if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, item, max_bson_size, validating_keys?) else field[:type].serialize(buffer, item, validating_keys?) end end else if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, value, max_bson_size, validating_keys?) else field[:type].serialize(buffer, value, validating_keys?) end end end end
Serializes the header of the message consisting of 4 32bit integers
The integers represent a message length placeholder (calculation of the actual length is deferred) the request id, the response to id, and the op code for the message
Currently uses hardcoded 0 for request id and response to as their values are irrelevent to the server
@param buffer [String] Buffer to receive the header @return [String] Serialized header
# File lib/mongo/protocol/message.rb, line 383 def serialize_header(buffer) Header.serialize(buffer, [0, request_id, 0, op_code]) end
# File lib/mongo/protocol/message.rb, line 460 def validating_keys? @options[:validating_keys] if @options end