class MatchReduce::Processor
This is the main lifecycle of the algorithm. You initialize a new instance of this class using aggregators, then you pump in records into it. Once done, call results to get the results.
Attributes
index[R]
results_builder[R]
Public Class Methods
new(aggregators, resolver: Objectable.resolver, any: ANY)
click to toggle source
# File lib/match_reduce/processor.rb, line 23 def initialize(aggregators, resolver: Objectable.resolver, any: ANY) @index = Index.new(aggregators, any: any) @results_builder = ResultsBuilder.new(index.aggregators, resolver) freeze end
Public Instance Methods
add(record)
click to toggle source
# File lib/match_reduce/processor.rb, line 34 def add(record) hit_aggregators = Set.new record_patterns(record).each do |hash_pattern| # Each index find hit means the aggregator matched on the record index.find(hash_pattern).each do |aggregator| next if hit_aggregators.include?(aggregator) add_to_results_builder(aggregator, record) hit_aggregators << aggregator end end self end
add_each(records)
click to toggle source
# File lib/match_reduce/processor.rb, line 30 def add_each(records) tap { records.each { |record| add(record) } } end
Private Instance Methods
add_to_results_builder(aggregator, record)
click to toggle source
# File lib/match_reduce/processor.rb, line 72 def add_to_results_builder(aggregator, record) group_id = make_group_id(aggregator, record) results_builder.add(aggregator, record, group_id) end
make_group_id(aggregator, record)
click to toggle source
# File lib/match_reduce/processor.rb, line 56 def make_group_id(aggregator, record) aggregator.group_keys.map { |group_key| resolver.get(record, group_key) } end
record_matrix(record)
click to toggle source
# File lib/match_reduce/processor.rb, line 60 def record_matrix(record) index.keys.each_with_object(HashMath::Matrix.new) do |key, memo| value = resolver.get(record, key) memo.add_each(key, [value, ANY]) end end
record_patterns(record)
click to toggle source
# File lib/match_reduce/processor.rb, line 68 def record_patterns(record) [{}] + record_matrix(record).to_a end