class Streamingly::Reducer
Public Class Methods
new(accumulator_class, accumulator_options = nil)
click to toggle source
# File lib/streamingly/reducer.rb, line 4 def initialize(accumulator_class, accumulator_options = nil) @accumulator_class = accumulator_class @accumulator_options = accumulator_options @error_callback_defined = @accumulator_class.method_defined?(:on_error) end
Public Instance Methods
on_error(error, error_context)
click to toggle source
# File lib/streamingly/reducer.rb, line 23 def on_error(error, error_context) raise error unless @error_callback_defined && !@accumulator.nil? @accumulator.on_error(error, error_context) end
reduce_over(enumerator) { |out| ... }
click to toggle source
# File lib/streamingly/reducer.rb, line 10 def reduce_over(enumerator) enumerator.each do |line| reduce(line.strip).each do |out| yield out end end flush.each do |out| yield out end end
Private Instance Methods
flush()
click to toggle source
# File lib/streamingly/reducer.rb, line 30 def flush @accumulator ? (@accumulator.flush || []).compact : [] rescue StandardError => error on_error(error, {}) [] end
new_accumulator(key)
click to toggle source
# File lib/streamingly/reducer.rb, line 60 def new_accumulator(key) if @accumulator_options @accumulator_class.new(key, @accumulator_options) else @accumulator_class.new(key) end end
reduce(line)
click to toggle source
# File lib/streamingly/reducer.rb, line 37 def reduce(line) # Streaming Hadoop only treats the first tab as the delimiter between # the key and value. Additional tabs are grouped into the value: # http://hadoop.apache.org/docs/r0.18.3/streaming.html#How+Does+Streaming+Work key, value = (line || '').split("\t", 2) return [] unless key && value results = nil if @prev_key != key results = flush @prev_key = key @accumulator = new_accumulator(key) end @accumulator.apply_value(value) if @accumulator results || [] rescue StandardError => error on_error(error, line: line) [] end