class Mongo::Collection::View::Aggregation
Provides behavior around an aggregation pipeline on a collection view.
@since 2.0.0
Constants
- REROUTE
The reroute message.
@since 2.1.0 @deprecated
Attributes
@return [ Array<Hash> ] pipeline The aggregation pipeline.
@return [ View
] view The collection view.
Public Class Methods
Initialize the aggregation for the provided collection view, pipeline and options.
@example Create the new aggregation view.
Aggregation.view.new(view, pipeline)
@param [ Collection::View
] view The collection view. @param [ Array<Hash> ] pipeline The pipeline of operations. @param [ Hash ] options The aggregation options.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 77 def initialize(view, pipeline, options = {}) @view = view @pipeline = pipeline.dup @options = BSON::Document.new(options).freeze end
Public Instance Methods
Set to true if disk usage is allowed during the aggregation.
@example Set disk usage flag.
aggregation.allow_disk_use(true)
@param [ true, false ] value The flag value.
@return [ true, false, Aggregation
] The aggregation if a value was
set or the value if used as a getter.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 62 def allow_disk_use(value = nil) configure(:allow_disk_use, value) end
Get the explain plan for the aggregation.
@example Get the explain plan for the aggregation.
aggregation.explain
@return [ Hash ] The explain plan.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 91 def explain self.class.new(view, pipeline, options.merge(explain: true)).first end
Whether this aggregation will write its result to a database collection.
@return [ Boolean ] Whether the aggregation will write its result
to a collection.
@api private
# File lib/mongo/collection/view/aggregation.rb, line 101 def write? pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } end
Private Instance Methods
# File lib/mongo/collection/view/aggregation.rb, line 111 def aggregate_spec(session) Builder::Aggregation.new(pipeline, view, options.merge(session: session)).specification end
Skip, sort, limit, projection are specified as pipeline stages rather than as options.
# File lib/mongo/collection/view/aggregation.rb, line 146 def cache_options { namespace: collection.namespace, selector: pipeline, read_concern: view.read_concern, read_preference: view.read_preference, collation: options[:collation], # Aggregations can read documents from more than one collection, # so they will be cleared on every write operation. multi_collection: true, } end
# File lib/mongo/collection/view/aggregation.rb, line 119 def initial_query_op(session) Operation::Aggregate.new(aggregate_spec(session)) end
# File lib/mongo/collection/view/aggregation.rb, line 115 def new(options) Aggregation.new(view, pipeline, options) end
# File lib/mongo/collection/view/aggregation.rb, line 132 def secondary_ok? !write? end
# File lib/mongo/collection/view/aggregation.rb, line 136 def send_initial_query(server, session) unless valid_server?(server) log_warn("Rerouting the Aggregation operation to the primary server - #{server.summary} is not suitable") server = cluster.next_primary(nil, session) end initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session)) end
# File lib/mongo/collection/view/aggregation.rb, line 107 def server_selector @view.send(:server_selector) end
# File lib/mongo/collection/view/aggregation.rb, line 123 def valid_server?(server) if secondary_ok? true else description = server.description description.standalone? || description.mongos? || description.primary? || description.load_balancer? end end