class Mongo::BulkWrite
Constants
- SINGLE_STATEMENT_OPS
Attributes
@return [ Mongo::Collection ] collection The collection.
@return [ Hash, BSON::Document ] options The options.
@return [ Array<Hash, BSON::Document> ] requests The requests.
Public Class Methods
Create the new bulk write operation.
@api private
@example Create an ordered bulk write.
Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}])
@example Create an unordered bulk write.
Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false)
@example Create an ordered mixed bulk write.
Mongo::BulkWrite.new( collection, [ { insert_one: { _id: 1 }}, { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}}, { delete_one: { filter: { _id: 2 }}} ] )
@param [ Mongo::Collection ] collection The collection. @param [ Enumerable<Hash, BSON::Document> ] requests The requests,
cannot be empty.
@param [ Hash, BSON::Document ] options The options.
@since 2.1.0
# File lib/mongo/bulk_write.rb, line 122 def initialize(collection, requests, options = {}) @collection = collection @requests = requests @options = options || {} end
Public Instance Methods
Execute the bulk write operation.
@example Execute the bulk write.
bulk_write.execute
@return [ Mongo::BulkWrite::Result ] The result.
@since 2.1.0
# File lib/mongo/bulk_write.rb, line 57 def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! client.send(:with_session, @options) do |session| context = Operation::Context.new(client: client, session: session) operations.each do |operation| if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session, txn_num) end else nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session) end end end end result_combiner.result end
Is the bulk write ordered?
@api private
@example Is the bulk write ordered?
bulk_write.ordered?
@return [ true, false ] If the bulk write is ordered.
@since 2.1.0
# File lib/mongo/bulk_write.rb, line 138 def ordered? @ordered ||= options.fetch(:ordered, true) end
Get the write concern for the bulk write.
@api private
@example Get the write concern.
bulk_write.write_concern
@return [ WriteConcern ] The write concern.
@since 2.1.0
# File lib/mongo/bulk_write.rb, line 152 def write_concern(session = nil) @write_concern ||= options[:write_concern] ? WriteConcern.get(options[:write_concern]) : collection.write_concern_with_session(session) end
Private Instance Methods
# File lib/mongo/bulk_write.rb, line 168 def base_spec(operation_id, session) { :db_name => database.name, :coll_name => collection.name, :write_concern => write_concern(session), :ordered => ordered?, :operation_id => operation_id, :bypass_document_validation => !!options[:bypass_document_validation], :max_time_ms => options[:max_time_ms], :options => options, :id_generator => client.options[:id_generator], :session => session, :comment => options[:comment], :let => options[:let], } end
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
For the following operations, the client can send a hint for servers >= 4.2 and for the rest, the client can only send it for 4.4+:
- updateOne - updateMany - replaceOne
@param [ Connection ] connection The connection object.
@return [ true | false ] Whether the request is able to send hints for
the current server version.
# File lib/mongo/bulk_write.rb, line 301 def can_hint?(connection) gte_4_2 = connection.server.description.server_version_gte?('4.2') gte_4_4 = connection.server.description.server_version_gte?('4.4') op_combiner.requests.all? do |req| op = req.keys.first if req[op].keys.include?(:hint) if [:update_one, :update_many, :replace_one].include?(op) gte_4_2 else gte_4_4 end else true end end end
# File lib/mongo/bulk_write.rb, line 235 def delete_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
# File lib/mongo/bulk_write.rb, line 228 def delete_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
# File lib/mongo/bulk_write.rb, line 185 def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) validate_collation!(connection) validate_array_filters!(connection) validate_hint!(connection) unpin_maybe(session, connection) do if values.size > connection.description.max_write_batch_size split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) else result = send(name, values, connection, context, operation_id, session, txn_num) add_server_diagnostics(connection) do add_error_labels(connection, context) do result_combiner.combine!(result, values.size) end end end end # With OP_MSG (3.6+ servers), the size of each section in the message # is independently capped at 16m and each bulk operation becomes # its own section. The size of the entire bulk write is limited to 48m. # With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a # single document and is thus subject to the 16m document size limit. # This means the splits differ between pre-3.6 and 3.6+ servers, with # 3.6+ servers being able to split less. rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 unpin_maybe(session, connection) do split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) end end
# File lib/mongo/bulk_write.rb, line 242 def insert_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num) Operation::Insert.new(spec).bulk_execute(connection, context: context) end
If the given object is an array return the first element, otherwise return the given object.
@param [ Object ] obj The given object.
@return [ Object ] The first element of the array or the given object.
# File lib/mongo/bulk_write.rb, line 372 def maybe_first(obj) obj.is_a?(Array) ? obj.first : obj end
# File lib/mongo/bulk_write.rb, line 217 def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end
# File lib/mongo/bulk_write.rb, line 164 def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end
# File lib/mongo/bulk_write.rb, line 221 def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num) txn_num = session.next_txn_num if txn_num && !session.in_transaction? execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num) end
# File lib/mongo/bulk_write.rb, line 257 def update_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents) Operation::Update.new(spec).bulk_execute(connection, context: context) end
# File lib/mongo/bulk_write.rb, line 249 def update_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num) Operation::Update.new(spec).bulk_execute(connection, context: context) end
# File lib/mongo/bulk_write.rb, line 272 def validate_array_filters!(connection) if op_combiner.has_array_filters? && !connection.features.array_filters_enabled? raise Error::UnsupportedArrayFilters.new end end
# File lib/mongo/bulk_write.rb, line 266 def validate_collation!(connection) if op_combiner.has_collation? && !connection.features.collation_enabled? raise Error::UnsupportedCollation.new end end
# File lib/mongo/bulk_write.rb, line 278 def validate_hint!(connection) if op_combiner.has_hint? if !can_hint?(connection) && write_concern && !write_concern.acknowledged? raise Error::UnsupportedOption.hint_error(unacknowledged_write: true) elsif !connection.features.update_delete_option_validation_enabled? raise Error::UnsupportedOption.hint_error end end end
Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.
Raise an ArgumentError if requests is empty.
@raise [ Error::InvalidUpdateDocument, Error::InvalidReplacementDocument,
ArgumentError ] if the document is invalid.
# File lib/mongo/bulk_write.rb, line 332 def validate_requests! requests_empty = true @requests.each do |req| requests_empty = false if op = req.keys.first if [:update_one, :update_many].include?(op) if doc = maybe_first(req.dig(op, :update)) if key = doc.keys&.first unless key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidUpdateDocument.new(key: key) else Error::InvalidUpdateDocument.warn(Logger.logger, key) end end end end elsif op == :replace_one if key = req.dig(op, :replacement)&.keys&.first if key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidReplacementDocument.new(key: key) else Error::InvalidReplacementDocument.warn(Logger.logger, key) end end end end end end.tap do raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty end end