class Mongo::Collection::View::Aggregation

Provides behavior around an aggregation pipeline on a collection view.

@since 2.0.0

Constants

REROUTE

The reroute message.

@since 2.1.0 @deprecated

Attributes

pipeline[R]

@return [ Array<Hash> ] pipeline The aggregation pipeline.

view[R]

@return [ View ] view The collection view.

Public Class Methods

new(view, pipeline, options = {}) click to toggle source

Initialize the aggregation for the provided collection view, pipeline and options.

@example Create the new aggregation view.

Aggregation.view.new(view, pipeline)

@param [ Collection::View ] view The collection view. @param [ Array<Hash> ] pipeline The pipeline of operations. @param [ Hash ] options The aggregation options.

@option options [ true, false ] :allow_disk_use Set to true if disk

usage is allowed during the aggregation.

@option options [ Integer ] :batch_size The number of documents to return

per batch.

@option options [ true, false ] :bypass_document_validation Whether or

not to skip document level validation.

@option options [ Hash ] :collation The collation to use. @option options [ Object ] :comment A user-provided

comment to attach to this command.

@option options [ String ] :hint The index to use for the aggregation. @option options [ Hash ] :let Mapping of variables to use in the pipeline.

See the server documentation for details.

@option options [ Integer ] :max_time_ms The maximum amount of time in

milliseconds to allow the aggregation to run.

@option options [ true, false ] :use_cursor Indicates whether the command

will request that the server provide results using a cursor. Note that
as of server version 3.6, aggregations always provide results using a
cursor and this option is therefore not valid.

@option options [ Session ] :session The session to use.

@since 2.0.0

# File lib/mongo/collection/view/aggregation.rb, line 96
def initialize(view, pipeline, options = {})
  @view = view
  @pipeline = pipeline.dup
  unless Mongo.broken_view_aggregate || view.filter.empty?
    @pipeline.unshift(:$match => view.filter)
  end
  @options = BSON::Document.new(options).freeze
end

Public Instance Methods

allow_disk_use(value = nil) click to toggle source

Set to true if disk usage is allowed during the aggregation.

@example Set disk usage flag.

aggregation.allow_disk_use(true)

@param [ true, false ] value The flag value.

@return [ true, false, Aggregation ] The aggregation if a value was

set or the value if used as a getter.

@since 2.0.0

# File lib/mongo/collection/view/aggregation.rb, line 61
def allow_disk_use(value = nil)
  configure(:allow_disk_use, value)
end
explain() click to toggle source

Get the explain plan for the aggregation.

@example Get the explain plan for the aggregation.

aggregation.explain

@return [ Hash ] The explain plan.

@since 2.0.0

# File lib/mongo/collection/view/aggregation.rb, line 113
def explain
  self.class.new(view, pipeline, options.merge(explain: true)).first
end
write?() click to toggle source

Whether this aggregation will write its result to a database collection.

@return [ Boolean ] Whether the aggregation will write its result

to a collection.

@api private

# File lib/mongo/collection/view/aggregation.rb, line 123
def write?
  pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) }
end

Private Instance Methods

aggregate_spec(session, read_preference) click to toggle source
# File lib/mongo/collection/view/aggregation.rb, line 133
def aggregate_spec(session, read_preference)
  Builder::Aggregation.new(
    pipeline,
    view,
    options.merge(session: session, read_preference: read_preference)
  ).specification
end
cache_options() click to toggle source

Skip, sort, limit, projection are specified as pipeline stages rather than as options.

# File lib/mongo/collection/view/aggregation.rb, line 196
def cache_options
  {
    namespace: collection.namespace,
    selector: pipeline,
    read_concern: view.read_concern,
    read_preference: view.read_preference,
    collation: options[:collation],
    # Aggregations can read documents from more than one collection,
    # so they will be cleared on every write operation.
    multi_collection: true,
  }
end
effective_read_preference(connection) click to toggle source

Return effective read preference for the operation.

If the pipeline contains $merge or $out, and read preference specified by user is secondary or secondary_preferred, and target server is below 5.0, than this method returns primary read preference, because the aggregation will be routed to primary. Otherwise return the original read preference.

See github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read-preferences-and-server-selection

@param [ Server::Connection ] connection The connection which

will be used for the operation.

@return [ Hash | nil ] read preference hash that should be sent with

this command.
# File lib/mongo/collection/view/aggregation.rb, line 163
def effective_read_preference(connection)
  return unless view.read_preference
  return view.read_preference unless write?
  return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode])

  primary_read_preference = {mode: :primary}
  description = connection.description
  if description.primary?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  elsif description.mongos? && !description.features.merge_out_on_secondary_enabled?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  else
    view.read_preference
  end

end
initial_query_op(session, read_preference) click to toggle source
# File lib/mongo/collection/view/aggregation.rb, line 145
def initial_query_op(session, read_preference)
  Operation::Aggregate.new(aggregate_spec(session, read_preference))
end
new(options) click to toggle source
# File lib/mongo/collection/view/aggregation.rb, line 141
def new(options)
  Aggregation.new(view, pipeline, options)
end
send_initial_query(server, session) click to toggle source
# File lib/mongo/collection/view/aggregation.rb, line 182
def send_initial_query(server, session)
  server.with_connection do |connection|
    initial_query_op(
      session,
      effective_read_preference(connection)
    ).execute_with_connection(
      connection,
      context: Operation::Context.new(client: client, session: session)
    )
  end
end
server_selector() click to toggle source
# File lib/mongo/collection/view/aggregation.rb, line 129
def server_selector
  @view.send(:server_selector)
end