class Mongo::BulkWrite

Constants

SINGLE_STATEMENT_OPS

Attributes

collection[R]

@return [ Mongo::Collection ] collection The collection.

options[R]

@return [ Hash, BSON::Document ] options The options.

requests[R]

@return [ Array<Hash, BSON::Document> ] requests The requests.

Public Class Methods

new(collection, requests, options = {}) click to toggle source

Create the new bulk write operation.

@api private

@example Create an ordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}])

@example Create an unordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false)

@example Create an ordered mixed bulk write.

Mongo::BulkWrite.new(
  collection,
  [
    { insert_one: { _id: 1 }},
    { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}},
    { delete_one: { filter: { _id: 2 }}}
  ]
)

@param [ Mongo::Collection ] collection The collection. @param [ Enumerable<Hash, BSON::Document> ] requests The requests,

cannot be empty.

@param [ Hash, BSON::Document ] options The options.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 123
def initialize(collection, requests, options = {})
  @collection = collection
  @requests = requests
  @options = options || {}
end

Public Instance Methods

execute() click to toggle source

Execute the bulk write operation.

@example Execute the bulk write.

bulk_write.execute

@return [ Mongo::BulkWrite::Result ] The result.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 58
def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine
  validate_requests!

  client.send(:with_session, @options) do |session|
    context = Operation::Context.new(client: client, session: session)
    operations.each do |operation|
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session,
            txn_num)
        end
      else
        nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session)
        end
      end
    end
  end
  result_combiner.result
end
ordered?() click to toggle source

Is the bulk write ordered?

@api private

@example Is the bulk write ordered?

bulk_write.ordered?

@return [ true, false ] If the bulk write is ordered.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 139
def ordered?
  @ordered ||= options.fetch(:ordered, true)
end
write_concern(session = nil) click to toggle source

Get the write concern for the bulk write.

@api private

@example Get the write concern.

bulk_write.write_concern

@return [ WriteConcern ] The write concern.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 153
def write_concern(session = nil)
  @write_concern ||= options[:write_concern] ?
    WriteConcern.get(options[:write_concern]) :
    collection.write_concern_with_session(session)
end

Private Instance Methods

base_spec(operation_id, session) click to toggle source
# File lib/mongo/bulk_write.rb, line 169
def base_spec(operation_id, session)
  {
    :db_name => database.name,
    :coll_name => collection.name,
    :write_concern => write_concern(session),
    :ordered => ordered?,
    :operation_id => operation_id,
    :bypass_document_validation => !!options[:bypass_document_validation],
    :max_time_ms => options[:max_time_ms],
    :options => options,
    :id_generator => client.options[:id_generator],
    :session => session,
    :comment => options[:comment],
    :let => options[:let],
  }
end
can_hint?(connection) click to toggle source

Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.

For the following operations, the client can send a hint for servers >= 4.2 and for the rest, the client can only send it for 4.4+:

- updateOne
- updateMany
- replaceOne

@param [ Connection ] connection The connection object.

@return [ true | false ] Whether the request is able to send hints for

the current server version.
# File lib/mongo/bulk_write.rb, line 302
def can_hint?(connection)
  gte_4_2 = connection.server.description.server_version_gte?('4.2')
  gte_4_4 = connection.server.description.server_version_gte?('4.4')
  op_combiner.requests.all? do |req|
    op = req.keys.first
    if req[op].keys.include?(:hint)
      if [:update_one, :update_many, :replace_one].include?(op)
        gte_4_2
      else
        gte_4_4
      end
    else
      true
    end
  end
end
delete_many(documents, connection, context, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 236
def delete_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end
delete_one(documents, connection, context, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 229
def delete_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end
execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) click to toggle source
# File lib/mongo/bulk_write.rb, line 186
def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil)
  validate_collation!(connection)
  validate_array_filters!(connection)
  validate_hint!(connection)

  unpin_maybe(session, connection) do
    if values.size > connection.description.max_write_batch_size
      split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, connection, context, operation_id, session, txn_num)

      add_server_diagnostics(connection) do
        add_error_labels(connection, context) do
          result_combiner.combine!(result, values.size)
        end
      end
    end
  end
# With OP_MSG (3.6+ servers), the size of each section in the message
# is independently capped at 16m and each bulk operation becomes
# its own section. The size of the entire bulk write is limited to 48m.
# With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a
# single document and is thus subject to the 16m document size limit.
# This means the splits differ between pre-3.6 and 3.6+ servers, with
# 3.6+ servers being able to split less.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1
  unpin_maybe(session, connection) do
    split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  end
end
insert_one(documents, connection, context, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 243
def insert_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
  Operation::Insert.new(spec).bulk_execute(connection, context: context)
end
maybe_first(obj) click to toggle source

If the given object is an array return the first element, otherwise return the given object.

@param [ Object ] obj The given object.

@return [ Object ] The first element of the array or the given object.

# File lib/mongo/bulk_write.rb, line 373
def maybe_first(obj)
  obj.is_a?(Array) ? obj.first : obj
end
op_combiner() click to toggle source
# File lib/mongo/bulk_write.rb, line 218
def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end
replace_one(documents, connection, context, operation_id, session, txn_num)
Alias for: update_one
single_statement?(operation) click to toggle source
# File lib/mongo/bulk_write.rb, line 165
def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end
split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 222
def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num)

  txn_num = session.next_txn_num if txn_num && !session.in_transaction?
  execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num)
end
update_many(documents, connection, context, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 258
def update_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end
update_one(documents, connection, context, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 250
def update_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end
Also aliased as: replace_one
validate_array_filters!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 273
def validate_array_filters!(connection)
  if op_combiner.has_array_filters? && !connection.features.array_filters_enabled?
    raise Error::UnsupportedArrayFilters.new
  end
end
validate_collation!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 267
def validate_collation!(connection)
  if op_combiner.has_collation? && !connection.features.collation_enabled?
    raise Error::UnsupportedCollation.new
  end
end
validate_hint!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 279
def validate_hint!(connection)
  if op_combiner.has_hint?
    if !can_hint?(connection) && write_concern && !write_concern.acknowledged?
      raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
    elsif !connection.features.update_delete_option_validation_enabled?
      raise Error::UnsupportedOption.hint_error
    end
  end
end
validate_requests!() click to toggle source

Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.

Raise an ArgumentError if requests is empty.

@raise [ Error::InvalidUpdateDocument, Error::InvalidReplacementDocument,

ArgumentError ]
if the document is invalid.
# File lib/mongo/bulk_write.rb, line 333
def validate_requests!
  requests_empty = true
  @requests.each do |req|
    requests_empty = false
    if op = req.keys.first
      if [:update_one, :update_many].include?(op)
        if doc = maybe_first(req.dig(op, :update))
          if key = doc.keys&.first
            unless key.to_s.start_with?("$")
              if Mongo.validate_update_replace
                raise Error::InvalidUpdateDocument.new(key: key)
              else
                Error::InvalidUpdateDocument.warn(Logger.logger, key)
              end
            end
          end
        end
      elsif op == :replace_one
        if key = req.dig(op, :replacement)&.keys&.first
          if key.to_s.start_with?("$")
            if Mongo.validate_update_replace
              raise Error::InvalidReplacementDocument.new(key: key)
            else
              Error::InvalidReplacementDocument.warn(Logger.logger, key)
            end
          end
        end
      end
    end
  end.tap do
    raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty
  end
end