class Mongo::Protocol::Message
A base class providing functionality required by all messages in the MongoDB wire protocol. It provides a minimal DSL for defining typed fields to enable serialization and deserialization over the wire.
@example
class WireProtocolMessage < Message private def op_code 1234 end FLAGS = [:first_bit, :bit_two] # payload field :flags, BitVector.new(FLAGS) field :namespace, CString field :document, Document field :documents, Document, true end
@abstract @api semiprivate
Constants
- BATCH_SIZE
The batch size constant.
@since 2.2.0
- COLLECTION
The collection constant.
@since 2.2.0
- LIMIT
The limit constant.
@since 2.2.0
- MAX_MESSAGE_SIZE
Default max message size of 48MB.
@since 2.2.1
- ORDERED
The ordered constant.
@since 2.2.0
- Q
The q constant.
@since 2.2.0
Attributes
Returns the request id for the message
@return [Fixnum] The request id for this message
Public Class Methods
Deserializes messages from an IO stream.
This method returns decompressed messages (i.e. if the message on the wire was OP_COMPRESSED, this method would typically return the OP_MSG message that is the result of decompression).
@param [ Integer ] max_message_size The max message size. @param [ IO ] io Stream containing a message @param [ Hash ] options
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
this message using BSON types instead of native Ruby types wherever possible.
@option options [ Numeric ] :socket_timeout The timeout to use for
each read operation.
@return [ Message ] Instance of a Message class
@api private
# File lib/mongo/protocol/message.rb, line 237 def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil, options = {} ) # io is usually a Mongo::Socket instance, which supports the # timeout option. For compatibility with whoever might call this # method with some other IO-like object, pass options only when they # are not empty. read_options = {} if timeout = options[:socket_timeout] read_options[:timeout] = timeout end if read_options.empty? chunk = io.read(16) else chunk = io.read(16, **read_options) end buf = BSON::ByteBuffer.new(chunk) length, _request_id, response_to, _op_code = deserialize_header(buf) # Protection from potential DOS man-in-the-middle attacks. See # DRIVERS-276. if length > (max_message_size || MAX_MESSAGE_SIZE) raise Error::MaxMessageSize.new(max_message_size) end # Protection against returning the response to a previous request. See # RUBY-1117 if expected_response_to && response_to != expected_response_to raise Error::UnexpectedResponse.new(expected_response_to, response_to) end if read_options.empty? chunk = io.read(length - 16) else chunk = io.read(length - 16, **read_options) end buf = BSON::ByteBuffer.new(chunk) message = Registry.get(_op_code).allocate message.send(:fields).each do |field| if field[:multi] deserialize_array(message, buf, field, options) else deserialize_field(message, buf, field, options) end end if message.is_a?(Msg) message.fix_after_deserialization end message.maybe_inflate end
Private Class Methods
Deserializes an array of fields in a message
The number of items in the array must be described by a previously
deserialized field specified in the class by the field dsl under the key
:multi
@param message [Message] Message to contain the deserialized array. @param io [IO] Stream containing the array to deserialize. @param field [Hash] Hash representing a field. @param options [ Hash ]
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
each of the elements in this array using BSON types wherever possible.
@return [Message] Message with deserialized array.
# File lib/mongo/protocol/message.rb, line 434 def self.deserialize_array(message, io, field, options = {}) elements = [] count = message.instance_variable_get(field[:multi]) count.times { elements << field[:type].deserialize(io, options) } message.instance_variable_set(field[:name], elements) end
Deserializes a single field in a message
@param message [Message] Message to contain the deserialized field. @param io [IO] Stream containing the field to deserialize. @param field [Hash] Hash representing a field. @param options [ Hash ]
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
this field using BSON types wherever possible.
@return [Message] Message with deserialized field.
# File lib/mongo/protocol/message.rb, line 452 def self.deserialize_field(message, io, field, options = {}) message.instance_variable_set( field[:name], field[:type].deserialize(io, options) ) end
Deserializes the header of the message
@param io [IO] Stream containing the header. @return [Array<Fixnum>] Deserialized header.
# File lib/mongo/protocol/message.rb, line 390 def self.deserialize_header(io) Header.deserialize(io) end
A method for declaring a message field
@param name [String] Name of the field @param type [Module] Type specific
serialization strategies @param multi [true, false, Symbol] Specify as
true
to
serialize the field's value as an array of type +:type+ or as a symbol describing the field having the number of items in the array (used upon deserialization) Note: In fields where multi is a symbol representing the field containing number items in the repetition, the field containing that information *must* be deserialized prior to deserializing fields that use the number.
@return [NilClass]
# File lib/mongo/protocol/message.rb, line 409 def self.field(name, type, multi = false) fields << { :name => "@#{name}".intern, :type => type, :multi => multi } attr_reader name end
A class method for getting the fields for a message class
@return [Integer] the fields for the message class
# File lib/mongo/protocol/message.rb, line 342 def self.fields @fields ||= [] end
Public Instance Methods
Tests for equality between two wire protocol messages by comparing class and field values.
@param other [Mongo::Protocol::Message] The wire protocol message. @return [true, false] The equality of the messages.
# File lib/mongo/protocol/message.rb, line 297 def ==(other) return false if self.class != other.class fields.all? do |field| name = field[:name] instance_variable_get(name) == other.instance_variable_get(name) end end
Creates a hash from the values of the fields of a message.
@return [ Fixnum ] The hash code for the message.
# File lib/mongo/protocol/message.rb, line 310 def hash fields.map { |field| instance_variable_get(field[:name]) }.hash end
# File lib/mongo/protocol/message.rb, line 172 def maybe_add_server_api(server_api) raise Error::ServerApiNotSupported, "Server API parameters cannot be sent to pre-3.6 MongoDB servers. Please remove the :server_api parameter from Client options or use MongoDB 3.6 or newer" end
Compress the message, if supported by the wire protocol used and if the command being sent permits compression. Otherwise returns self.
@param [ String, Symbol ] compressor The compressor to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.
@return [ self ] Always returns self. Other message types should
override this method.
@since 2.5.0 @api private
# File lib/mongo/protocol/message.rb, line 111 def maybe_compress(compressor, zlib_compression_level = nil) self end
Possibly decrypt this message with libmongocrypt.
@param [ Mongo::Operation::Context ] context The operation context.
@return [ Mongo::Protocol::Msg ] The decrypted message, or the original
message if decryption was not possible or necessary.
# File lib/mongo/protocol/message.rb, line 151 def maybe_decrypt(context) # TODO determine if we should be decrypting data coming from pre-4.2 # servers, potentially using legacy wire protocols. If so we need # to implement decryption for those wire protocols as our current # encryption/decryption code is OP_MSG-specific. self end
Possibly encrypt this message with libmongocrypt.
@param [ Mongo::Server::Connection ] connection The connection on which
the operation is performed.
@param [ Mongo::Operation::Context ] context The operation context.
@return [ Mongo::Protocol::Msg ] The encrypted message, or the original
message if encryption was not possible or necessary.
# File lib/mongo/protocol/message.rb, line 167 def maybe_encrypt(connection, context) # Do nothing if the Message subclass has not implemented this method self end
Inflate a message if it is compressed.
@return [ Protocol::Message ] Always returns self. Subclasses should
override this method as necessary.
@since 2.5.0 @api private
# File lib/mongo/protocol/message.rb, line 141 def maybe_inflate self end
Default number returned value for protocol messages.
@return [ 0 ] This method must be overridden, otherwise, always returns 0.
@since 2.5.0
# File lib/mongo/protocol/message.rb, line 328 def number_returned; 0; end
The default for messages is not to require a reply after sending a message to the server.
@example Does the message require a reply?
message.replyable?
@return [ false ] The default is to not require a reply.
@since 2.0.0
# File lib/mongo/protocol/message.rb, line 96 def replyable? false end
Serializes message into bytes that can be sent on the wire
@param buffer [String] buffer where the message should be inserted @return [String] buffer containing the serialized message
# File lib/mongo/protocol/message.rb, line 200 def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) max_size = if max_bson_size && bson_overhead max_bson_size + bson_overhead elsif max_bson_size max_bson_size else nil end start = buffer.length serialize_header(buffer) serialize_fields(buffer, max_size) buffer.replace_int32(start, buffer.length - start) end
Generates a request id for a message
@return [Fixnum] a request id used for sending a message to the
server. The server will put this id in the response_to field of a reply.
# File lib/mongo/protocol/message.rb, line 319 def set_request_id @request_id = self.class.next_id end
Private Instance Methods
Compress the message, if the command being sent permits compression. Otherwise returns self.
@param [ String ] command_name Command name extracted from the message. @param [ String | Symbol ] compressor The compressor to use. @param [ Integer ] zlib_compression_level Zlib compression level to use.
@return [ Message ] A Protocol::Compressed message or self,
depending on whether this message can be compressed.
@since 2.5.0
# File lib/mongo/protocol/message.rb, line 126 def compress_if_possible(command_name, compressor, zlib_compression_level) if compressor && compression_allowed?(command_name) Compressed.new(self, compressor, zlib_compression_level) else self end end
A method for getting the fields for a message class
@return [Integer] the fields for the message class
# File lib/mongo/protocol/message.rb, line 335 def fields self.class.fields end
# File lib/mongo/protocol/message.rb, line 176 def merge_sections cmd = if @sections.length > 1 cmd = @sections.detect { |section| section[:type] == 0 }[:payload] identifier = @sections.detect { |section| section[:type] == 1}[:payload][:identifier] cmd.merge(identifier.to_sym => @sections.select { |section| section[:type] == 1 }. map { |section| section[:payload][:sequence] }. inject([]) { |arr, documents| arr + documents } ) elsif @sections.first[:payload] @sections.first[:payload] else @sections.first end if cmd.nil? raise "The command should never be nil here" end cmd end
Serializes message fields into a buffer
@param buffer [String] buffer to receive the field @return [String] buffer with serialized field
# File lib/mongo/protocol/message.rb, line 350 def serialize_fields(buffer, max_bson_size = nil) fields.each do |field| value = instance_variable_get(field[:name]) if field[:multi] value.each do |item| if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, item, max_bson_size) else field[:type].serialize(buffer, item) end end else if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, value, max_bson_size) else field[:type].serialize(buffer, value) end end end end
Serializes the header of the message consisting of 4 32bit integers
The integers represent a message length placeholder (calculation of the actual length is deferred) the request id, the response to id, and the op code for the message
Currently uses hardcoded 0 for request id and response to as their values are irrelevent to the server
@param buffer [String] Buffer to receive the header @return [String] Serialized header
# File lib/mongo/protocol/message.rb, line 382 def serialize_header(buffer) Header.serialize(buffer, [0, request_id, 0, op_code]) end