module Mongo::Operation::SessionsSupported

Shared behavior of operations that support a session.

@since 2.5.2 @api private

Constants

READ_COMMANDS
ZERO_TIMESTAMP

Private Instance Methods

add_read_preference(sel, connection) click to toggle source

Adds $readPreference field to the command document.

$readPreference is only sent when the server is a mongos, following the rules described in github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#passing-read-preference-to-mongos. The topology does not matter for figuring out whether to send $readPreference since the decision is always made based on server type.

$readPreference is sent to OP_MSG-grokking replica set members.

@param [ Hash ] sel Existing command document which will be mutated. @param [ Server::Connection ] connection The connection that the

operation will be executed on.
# File lib/mongo/operation/shared/sessions_supported.rb, line 168
def add_read_preference(sel, connection)
  Lint.assert_type(connection, Server::Connection)

  # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#topology-type-single
  read_doc = if connection.description.standalone?
    # Read preference is never sent to standalones.
    nil
  elsif connection.server.load_balancer?
    read&.to_mongos
  elsif connection.description.mongos?
    # When server is a mongos:
    # - $readPreference is never sent when mode is 'primary'
    # - Otherwise $readPreference is sent
    # When mode is 'secondaryPreferred' $readPreference is currently
    # required to only be sent when a non-mode field (i.e. tag_sets)
    # is present, but this causes wrong behavior (DRIVERS-1642).
    read&.to_mongos
  elsif connection.server.cluster.single?
    # In Single topology:
    # - If no read preference is specified by the application, the driver
    #   adds mode: primaryPreferred.
    # - If a read preference is specified by the application, the driver
    #   replaces the mode with primaryPreferred.
    read_doc = if read
      BSON::Document.new(read.to_doc)
    else
      BSON::Document.new
    end
    if [nil, 'primary'].include?(read_doc['mode'])
      read_doc['mode'] = 'primaryPreferred'
    end
    read_doc
  else
    # In replica sets, read preference is passed to the server if one
    # is specified by the application, except for primary read preferences.
    read_doc = BSON::Document.new(read&.to_doc || {})
    if [nil, 'primary'].include?(read_doc['mode'])
      nil
    else
      read_doc
    end
  end

  if read_doc
    sel['$readPreference'] = read_doc
  end
end
add_write_concern!(sel) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 97
def add_write_concern!(sel)
  sel[:writeConcern] = write_concern.options if write_concern
end
apply_autocommit!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 101
def apply_autocommit!(selector)
  session.add_autocommit!(selector)
end
apply_causal_consistency!(selector, connection) click to toggle source

Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.

When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.

# File lib/mongo/operation/shared/sessions_supported.rb, line 52
def apply_causal_consistency!(selector, connection)
  return unless selector[:startTransaction]

  apply_causal_consistency_if_possible(selector, connection)
end
apply_causal_consistency_if_possible(selector, connection) click to toggle source

Adds causal consistency document to the selector, if one can be constructed.

In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.

# File lib/mongo/operation/shared/sessions_supported.rb, line 65
def apply_causal_consistency_if_possible(selector, connection)
  if !connection.description.standalone?
    cc_doc = session.send(:causal_consistency_doc)
    if cc_doc
      rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc)
      selector[:readConcern] = Options::Mapper.transform_values_to_strings(
        rc_doc)
    end
  end
end
apply_cluster_time!(selector, connection) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 80
def apply_cluster_time!(selector, connection)
  if !connection.description.standalone?
    cluster_time = [
      connection.cluster_time,
      session&.cluster_time,
    ].compact.max

    if cluster_time
      selector['$clusterTime'] = cluster_time
    end
  end
end
apply_read_pref!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 113
def apply_read_pref!(selector)
  session.apply_read_pref!(selector) if read_command?(selector)
end
apply_session_options(sel, connection) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 216
def apply_session_options(sel, connection)
  apply_cluster_time!(sel, connection)
  sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
  sel.merge!(lsid: session.session_id)
  apply_start_transaction!(sel)
  apply_causal_consistency!(sel, connection)
  apply_autocommit!(sel)
  apply_txn_opts!(sel)
  suppress_read_write_concern!(sel)
  validate_read_preference!(sel)
  apply_txn_num!(sel)
  if session.recovery_token &&
    (sel[:commitTransaction] || sel[:abortTransaction])
  then
    sel[:recoveryToken] = session.recovery_token
  end

  if session.snapshot?
    unless connection.description.server_version_gte?('5.0')
      raise Error::SnapshotSessionInvalidServerVersion
    end

    sel[:readConcern] = {level: 'snapshot'}
    if session.snapshot_timestamp
      sel[:readConcern][:atClusterTime] = session.snapshot_timestamp
    end
  end
end
apply_start_transaction!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 105
def apply_start_transaction!(selector)
  session.add_start_transaction!(selector)
end
apply_txn_num!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 109
def apply_txn_num!(selector)
  session.add_txn_num!(selector)
end
apply_txn_opts!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 117
def apply_txn_opts!(selector)
  session.add_txn_opts!(selector, read_command?(selector))
end
build_message(connection, context) click to toggle source
Calls superclass method
# File lib/mongo/operation/shared/sessions_supported.rb, line 245
def build_message(connection, context)
  if self.session != context.session
    if self.session
      raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}"
    else
      # Some operations are not constructed with sessions but are
      # executed in a context where a session is available.
      # This could be OK or a driver issue.
      # TODO investigate.
    end
  end

  super.tap do |message|
    if session = context.session
      # Serialize the message to detect client-side problems,
      # such as invalid BSON keys or too large messages.
      # The message will be serialized again
      # later prior to being sent to the connection.
      buf = BSON::ByteBuffer.new
      message.serialize(buf)
      if buf.length > connection.max_message_size
        raise Error::MaxMessageSize.new(connection.max_message_size)
      end
      session.update_state!
    end
  end
end
command(connection) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 129
def command(connection)
  if Lint.enabled?
    unless connection.is_a?(Server::Connection)
      raise Error::LintError, "Connection is not a Connection instance: #{connection}"
    end
  end

  sel = BSON::Document.new(selector(connection))
  add_write_concern!(sel)
  sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name

  add_read_preference(sel, connection)

  if connection.features.sessions_enabled?
    apply_cluster_time!(sel, connection)
    if session && (acknowledged_write? || session.in_transaction?)
      apply_session_options(sel, connection)
    end
  elsif session && session.explicit?
    apply_session_options(sel, connection)
  end

  sel
end
flags() click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 76
def flags
  acknowledged_write? ? [] : [:more_to_come]
end
read_command?(sel) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 93
def read_command?(sel)
  READ_COMMANDS.any? { |c| sel[c] }
end
suppress_read_write_concern!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 121
def suppress_read_write_concern!(selector)
  session.suppress_read_write_concern!(selector)
end
validate_read_preference!(selector) click to toggle source
# File lib/mongo/operation/shared/sessions_supported.rb, line 125
def validate_read_preference!(selector)
  session.validate_read_preference!(selector) if read_command?(selector)
end