class Mushy::Flux
Attributes
all[RW]
config[RW]
flow[RW]
id[RW]
masher[RW]
parent_fluxs[RW]
subscribed_to[RW]
type[RW]
Public Class Methods
inherited(subclass)
click to toggle source
# File lib/mushy/flux.rb, line 20 def inherited subclass if (self != Mushy::Flux) Mushy::Flux.inherited subclass else self.all ||= [] self.all << subclass end end
new()
click to toggle source
# File lib/mushy/flux.rb, line 13 def initialize guard end
Public Instance Methods
convert_this_to_an_array(value)
click to toggle source
# File lib/mushy/flux.rb, line 157 def convert_this_to_an_array value [value] .flatten .map { |x| x.to_s.split(',').map { |x| x.strip } } .flatten .select { |x| x && x != '' } end
convert_to_symbolized_hash(event)
click to toggle source
# File lib/mushy/flux.rb, line 165 def convert_to_symbolized_hash event data = SymbolizedHash.new event.each { |k, v| data[k] = v } data end
execute(incoming_event)
click to toggle source
# File lib/mushy/flux.rb, line 38 def execute incoming_event guard incoming_event = SymbolizedHash.new(incoming_event) if incoming_event.is_a?(Hash) incoming_split = masher.mash(config, incoming_event)[:incoming_split] config_considering_an_imcoming_split = config .reject { |x, _| incoming_split && x.to_s == 'join' } .reduce({}) { |t, i| t[i[0]] = i[1]; t } events = incoming_split ? incoming_event[incoming_split] : [incoming_event] results = events.map { |e| execute_single_event e, config_considering_an_imcoming_split } return results.first unless incoming_split results = join_these_results([results].flatten, incoming_event, config[:join]) if config[:join] results.flatten end
execute_single_event(event, config)
click to toggle source
# File lib/mushy/flux.rb, line 59 def execute_single_event event, config mashed_config = masher.mash config, event the_original_join = mashed_config[:join] mashed_config[:join] = nil if mashed_config[:incoming_split] results = process event, mashed_config returned_one_result = results.is_a?(Hash) results = standardize_these results results = shape_these results, event, config return results.first if the_original_join return results if mashed_config[:outgoing_split] returned_one_result ? results.first : results rescue Exception => e raise e if config[:error_strategy].to_s == '' return [] if config[:error_strategy] == 'ignore' { exception: e.message } end
group_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 119 def group_these_results results, event, by group_by = by.split('|')[0] result_key = by.split('|')[1] results.group_by { |x| x[group_by] }.map { |k, v| SymbolizedHash.new( { result_key => v } ) } end
guard()
click to toggle source
# File lib/mushy/flux.rb, line 30 def guard self.id ||= SecureRandom.uuid self.parent_fluxs ||= [] self.subscribed_to ||= [] self.masher ||= Masher.new self.config ||= SymbolizedHash.new end
ignore_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 138 def ignore_these_results results, event, by return results if by.to_s == '' ignore_fields = by.split ',' results.each { |r| ignore_fields.each { |f| r.delete f } } results end
join_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 129 def join_these_results results, event, by [SymbolizedHash.new( { by => results } )] end
limit_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 112 def limit_these_results results, event, by results .each_with_index .select { |x, i| i < by.to_i } .map { |x, _| x } end
merge_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 145 def merge_these_results results, event, by keys_to_merge = convert_this_to_an_array by keys_to_merge = event.keys.map { |x| x.to_s } if (keys_to_merge[0] == '*') results.map do |result| event.select { |k, _| keys_to_merge.include? k.to_s }.each do |k, v| result[k] = v unless result[k] end result end end
model_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 133 def model_these_results results, event, by return results unless by.any? results.map { |x| masher.mash by, x } end
outgoing_split_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 125 def outgoing_split_these_results results, event, by results.map { |x| Masher.new.dig by, x }.flatten end
process(event, config)
click to toggle source
# File lib/mushy/flux.rb, line 171 def process event, config event end
shape_these(results, event, config)
click to toggle source
# File lib/mushy/flux.rb, line 92 def shape_these results, event, config supported_shaping = [:merge, :outgoing_split, :group, :model, :ignore, :join, :sort, :limit] shaping = supported_shaping if (config[:shaping]) shaping = convert_this_to_an_array(config[:shaping]).map { |x| x.to_sym } end supported_shaping .select { |x| config[x] } .each_with_index .sort_by { |x, i| shaping.index(x) || i + supported_shaping.count } .map { |x, _| x } .reduce(results) { |t, i| self.send("#{i}_these_results".to_sym, t, event, config[i]) } end
sort_these_results(results, event, by)
click to toggle source
# File lib/mushy/flux.rb, line 108 def sort_these_results results, event, by results.sort { |x| x[by].to_i } end
standardize_these(results)
click to toggle source
# File lib/mushy/flux.rb, line 85 def standardize_these results [results] .flatten .map { |x| x.is_a?(Hash) ? convert_to_symbolized_hash(x) : nil } .select { |x| x } end