class MapReduce::Master
Public Class Methods
new(opts = {})
click to toggle source
# File lib/map_reduce/master.rb, line 5 def initialize(opts = {}) @socket_addr = opts[:socket] || ::MapReduce::DEFAULT_SOCKET @log_folder = opts[:log_folder] || "/tmp/map_reduce" @delimiter = opts[:delimiter] || "\t" @tasks = {} end
Public Instance Methods
after_map(&blk)
click to toggle source
# File lib/map_reduce/master.rb, line 23 def after_map(&blk) @after_map = blk end
after_reduce(&blk)
click to toggle source
# File lib/map_reduce/master.rb, line 27 def after_reduce(&blk) @after_reduce = blk end
recieve_msg(message, envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 31 def recieve_msg(message, envelope) mtype = case message[0] when "map" store_map(message, envelope) when "map_finished" all_finished?(message, envelope) when "reduce" send_reduce(message, envelope) else MapReduce.logger.error("Wrong message type: #{mtype}") end end
run()
click to toggle source
# File lib/map_reduce/master.rb, line 13 def run EM.run do socket end end
stop()
click to toggle source
# File lib/map_reduce/master.rb, line 19 def stop EM.stop end
Private Instance Methods
all_finished?(message, envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 75 def all_finished?(message, envelope) status, task = message register(task, envelope, "mapper", status) if @tasks[task]["mapper"].all?{ |k,v| v == status } ok(envelope) else no(envelope) end end
map_log(task)
click to toggle source
# File lib/map_reduce/master.rb, line 85 def map_log(task) @map_log ||= {} @map_log[task] ||= MapReduce::MapLog.new(@log_folder, task) end
np(envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 101 def np(envelope) reply(["not ok"], envelope) end
ok(envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 97 def ok(envelope) reply(["ok"], envelope) end
reduce_log(task, force = false)
click to toggle source
# File lib/map_reduce/master.rb, line 90 def reduce_log(task, force = false) @reduce_log ||= {} @reduce_log[task] ||= MapReduce::ReduceLog.new(map_log(task), @delimiter) @reduce_log[task].force if force @reduce_log[task] end
register(task, envelope, type, status)
click to toggle source
# File lib/map_reduce/master.rb, line 109 def register(task, envelope, type, status) @tasks[task] ||= {} @tasks[task][type] ||= {} @tasks[task][type][envelope[0]] = status end
reply(resp, envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 105 def reply(resp, envelope) socket.send_reply(resp, envelope) end
send_reduce(message, envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 55 def send_reduce(message, envelope) status, task = message data = if @tasks.fetch(task, {}).fetch("reducer", {}).fetch(envelope[0], nil) == "reduce" reduce_log(task).get_data else reduce_log(task, true).get_data end if data register(task, envelope, "reducer", status) else register(task, envelope, "reducer", "reduce_finished") end reply(data, envelope) @after_reduce.call(data[0], data[1], task) if data && @after_reduce end
socket()
click to toggle source
# File lib/map_reduce/master.rb, line 115 def socket @socket ||= begin master = self sock = MapReduce::Socket::Master.new(self) sock.bind(@socket_addr) sock end end
store_map(message, envelope)
click to toggle source
# File lib/map_reduce/master.rb, line 46 def store_map(message, envelope) status, key, value, task = message map_log(task) << "#{key}#{@delimiter}#{value}" ok(envelope) register(task, envelope, "mapper", status) @after_map.call(key, value, task) if @after_map end