class Streamingly::Reducer

Public Class Methods

new(accumulator_class, accumulator_options = nil) click to toggle source
# File lib/streamingly/reducer.rb, line 4
def initialize(accumulator_class, accumulator_options = nil)
  @accumulator_class = accumulator_class
  @accumulator_options = accumulator_options
  @error_callback_defined = @accumulator_class.method_defined?(:on_error)
end

Public Instance Methods

on_error(error, error_context) click to toggle source
# File lib/streamingly/reducer.rb, line 23
def on_error(error, error_context)
  raise error unless @error_callback_defined && !@accumulator.nil?
  @accumulator.on_error(error, error_context)
end
reduce_over(enumerator) { |out| ... } click to toggle source
# File lib/streamingly/reducer.rb, line 10
def reduce_over(enumerator)
  enumerator.each do |line|
    reduce(line.strip).each do |out|
      yield out
    end
  end

  flush.each do |out|
    yield out
  end

end

Private Instance Methods

flush() click to toggle source
# File lib/streamingly/reducer.rb, line 30
def flush
  @accumulator ? (@accumulator.flush || []).compact : []
rescue StandardError => error
  on_error(error, {})
  []
end
new_accumulator(key) click to toggle source
# File lib/streamingly/reducer.rb, line 60
def new_accumulator(key)
  if @accumulator_options
    @accumulator_class.new(key, @accumulator_options)
  else
    @accumulator_class.new(key)
  end
end
reduce(line) click to toggle source
# File lib/streamingly/reducer.rb, line 37
def reduce(line)
  # Streaming Hadoop only treats the first tab as the delimiter between
  # the key and value.  Additional tabs are grouped into the value:
  # http://hadoop.apache.org/docs/r0.18.3/streaming.html#How+Does+Streaming+Work
  key, value = (line || '').split("\t", 2)
  return [] unless key && value

  results = nil
  if @prev_key != key
    results = flush

    @prev_key = key
    @accumulator = new_accumulator(key)
  end

  @accumulator.apply_value(value) if @accumulator

  results || []
rescue StandardError => error
  on_error(error, line: line)
  []
end