class Mongo::Protocol::Compressed
MongoDB Wire protocol Compressed message.
This is a bi-directional message that compresses another opcode. See github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.rst
@api semipublic
@since 2.5.0
Constants
- COMPRESSOR_ID_MAP
The compressor identifier to byte map.
@since 2.5.0
- NOOP
The noop compressor identifier.
- NOOP_BYTE
The byte signaling that the message has not been compressed (test mode).
- OP_CODE
The operation code for a
Compressed
message. @return [ Fixnum ] the operation code.@since 2.5.0
- SNAPPY
The snappy compressor identifier.
- SNAPPY_BYTE
The byte signaling that the message has been compressed with snappy.
- ZLIB
The Zlib compressor identifier.
@since 2.5.0
- ZLIB_BYTE
The byte signaling that the message has been compressed with Zlib.
@since 2.5.0
- ZSTD
The zstd compressor identifier.
- ZSTD_BYTE
The byte signaling that the message has been compressed with zstd.
Public Class Methods
Creates a new OP_COMPRESSED message.
@example Create an OP_COMPRESSED message.
Compressed.new(original_message, 'zlib')
@param [ Mongo::Protocol::Message ] message The original message. @param [ String, Symbol ] compressor The compression algorithm to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.
-1 and nil imply default.
@since 2.5.0
# File lib/mongo/protocol/compressed.rb, line 78 def initialize(message, compressor, zlib_compression_level = nil) @original_message = message @original_op_code = message.op_code @uncompressed_size = 0 @compressor_id = COMPRESSOR_ID_MAP[compressor] @compressed_message = '' @zlib_compression_level = zlib_compression_level if zlib_compression_level && zlib_compression_level != -1 @request_id = message.request_id end
Public Instance Methods
Inflates an OP_COMRESSED message and returns the original message.
@return [ Protocol::Message ] The inflated message.
@since 2.5.0 @api private
# File lib/mongo/protocol/compressed.rb, line 94 def maybe_inflate message = Registry.get(@original_op_code).allocate buf = decompress(@compressed_message) message.send(:fields).each do |field| if field[:multi] Message.deserialize_array(message, buf, field) else Message.deserialize_field(message, buf, field) end end if message.is_a?(Msg) message.fix_after_deserialization end message end
Whether the message expects a reply from the database.
@example Does the message require a reply?
message.replyable?
@return [ true, false ] If the message expects a reply.
@since 2.5.0
# File lib/mongo/protocol/compressed.rb, line 119 def replyable? @original_message.replyable? end
Private Instance Methods
# File lib/mongo/protocol/compressed.rb, line 155 def compress(buffer) if @compressor_id == NOOP_BYTE buffer.to_s.force_encoding(BSON::BINARY) elsif @compressor_id == ZLIB_BYTE Zlib::Deflate.deflate(buffer.to_s, @zlib_compression_level).force_encoding(BSON::BINARY) elsif @compressor_id == SNAPPY_BYTE Snappy.deflate(buffer.to_s).force_encoding(BSON::BINARY) elsif @compressor_id == ZSTD_BYTE # DRIVERS-600 will allow this to be configurable in the future Zstd.compress(buffer.to_s).force_encoding(BSON::BINARY) end end
# File lib/mongo/protocol/compressed.rb, line 168 def decompress(compressed_message) if @compressor_id == NOOP_BYTE BSON::ByteBuffer.new(compressed_message) elsif @compressor_id == ZLIB_BYTE BSON::ByteBuffer.new(Zlib::Inflate.inflate(compressed_message)) elsif @compressor_id == SNAPPY_BYTE BSON::ByteBuffer.new(Snappy.inflate(compressed_message)) elsif @compressor_id == ZSTD_BYTE BSON::ByteBuffer.new(Zstd.decompress(compressed_message)) end end
# File lib/mongo/protocol/compressed.rb, line 147 def serialize_fields(buffer, max_bson_size) buf = BSON::ByteBuffer.new @original_message.send(:serialize_fields, buf, max_bson_size) @uncompressed_size = buf.length @compressed_message = compress(buf) super end