class MapReduce::ReduceLog
Public Class Methods
new(map_log, delimiter)
click to toggle source
# File lib/map_reduce/reduce_log.rb, line 3 def initialize(map_log, delimiter) @map_log = map_log @delimiter = delimiter end
Public Instance Methods
force()
click to toggle source
# File lib/map_reduce/reduce_log.rb, line 39 def force unless @lines fn = log_file if fn @file = File.open(fn) @lines = @file.each_line end end end
get_data()
click to toggle source
# File lib/map_reduce/reduce_log.rb, line 8 def get_data if @lines current_key = nil current_values = [] line = nil while true begin line = @lines.peek.chomp key, values = line.split(@delimiter) current_key ||= key if current_key != key break else current_values << values @lines.next end rescue StopIteration => e @file.close File.unlink(File.path(@file)) @lines = nil break rescue => e MapReduce.logger.error("#{e.message} for line #{line.inspect}") @lines.next end end [current_key, *current_values] end end
log_file()
click to toggle source
# File lib/map_reduce/reduce_log.rb, line 49 def log_file reduce_fn = File.join(@map_log.log_folder, "reducer.log") if File.exist? reduce_fn reduce_fn else reduce_fn = @map_log.reset if reduce_fn sort(reduce_fn) reduce_fn end end end
sort(fn)
click to toggle source
# File lib/map_reduce/reduce_log.rb, line 62 def sort(fn) `sort #{fn} -o #{fn} -k1nr` end