class SimpleMapReduce::Worker::RunMapTaskWorker
Constants
- HTTP_JSON_HEADER
Public Instance Methods
perform(job, map_worker)
click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 8 def perform(job, map_worker) task_wrapper_class_name = "TaskWrapper#{job.id.delete('-')}" self.class.class_eval("class #{task_wrapper_class_name}; end", 'Task Wrapper Class') task_wrapper_class = self.class.const_get(task_wrapper_class_name) task_wrapper_class.class_eval(job.map_script, 'Map task script') map_task = task_wrapper_class.const_get(job.map_class_name, false).new unless map_task.respond_to?(:map) raise InvalidMapTaskError, 'no map method' end logger.info('map task start') local_input_cache = Tempfile.new s3_client.get_object( response_target: local_input_cache.path, bucket: job.job_input_bucket_name, key: job.job_input_directory_path ) local_input_cache.rewind local_output_cache = Tempfile.new local_input_cache.each_line(chomp: true, rs: "\n") do |line| map_task.map(line, local_output_cache) end local_output_cache.rewind logger.debug("output data size: #{local_output_cache.size}") logger.debug('---map output digest---') local_output_cache.take(5).each do |line| logger.debug(line) end logger.debug('---map output digest---') response = http_client(SimpleMapReduce.job_tracker_url).post do |request| request.url('/workers/reserve') # TODO: providing a way to specify worker_size request.body = { worker_size: 2 }.to_json end logger.debug(response.body) # {"succeeded":true,"workers":[{"id":70157882164440,"url":"http://localhost:4569","state":'reserved'}]} reserved_workers = JSON.parse(response.body, symbolize_names: true)[:reserved_workers].map do |worker| SimpleMapReduce::Server::Worker.new( id: worker[:id], url: worker[:url], state: worker[:state].to_sym, data_store_type: 'remote' ) end if reserved_workers.count == 0 # keep working with same worker reserved_workers << map_worker end shuffle(job, reserved_workers, local_output_cache) if reserved_workers.all? { |w| w.id != map_worker.id } begin map_worker.ready! rescue => notify_error logger.fatal(notify_error.inspect) logger.fatal(notify_error.backtrace.take(50)) end end rescue => e logger.error(e.inspect) logger.error(e.backtrace.take(50)) job.failed! # TODO: notifying to job_tracker that this task have failed ensure local_input_cache&.delete local_output_cache&.delete if self.class.const_defined?(task_wrapper_class_name.to_sym) self.class.send(:remove_const, task_wrapper_class_name.to_sym) end logger.info('map task end') end
Private Instance Methods
http_client(url)
click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 101 def http_client(url) ::Faraday.new( url: url, headers: HTTP_JSON_HEADER ) do |faraday| faraday.response :logger faraday.response :raise_error faraday.adapter Faraday.default_adapter end end
logger()
click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 92 def logger SimpleMapReduce.logger end
s3_client()
click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 88 def s3_client SimpleMapReduce::S3Client.instance.client end
shuffle(job, workers, local_output_cache)
click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 112 def shuffle(job, workers, local_output_cache) workers_count = workers.count raise 'No workers' unless workers_count > 0 shuffled_local_outputs = Array.new(workers_count) shuffled_local_outputs.each_with_index { |_, i| shuffled_local_outputs[i] = Tempfile.new } local_output_cache.each_line(rs: "\n") do |raw_line| output = JSON.parse(raw_line, symbolize_names: true) partition_id = output[:key].hash % workers_count shuffled_local_outputs[partition_id].puts(output.to_json) end task_script = job.reduce_script task_class_name = job.reduce_class_name task_input_bucket_name = SimpleMapReduce.s3_intermediate_bucket_name task_output_bucket_name = job.job_output_bucket_name task_output_directory_path = job.job_output_directory_path task_input_file_path_prefix = "#{job.id}/map_output_#{Time.now.to_i}/" workers.each_with_index do |worker, partition_id| reduce_task = ::SimpleMapReduce::Server::Task.new( job_id: job.id, task_class_name: task_class_name, task_script: task_script, task_input_bucket_name: task_input_bucket_name, task_input_file_path: "#{task_input_file_path_prefix}#{partition_id}_map_output.txt", task_output_bucket_name: task_output_bucket_name, task_output_directory_path: task_output_directory_path ) local_output_cache = shuffled_local_outputs[partition_id] local_output_cache.rewind s3_client.put_object( body: local_output_cache.read, bucket: reduce_task.task_input_bucket_name, key: reduce_task.task_input_file_path ) response = http_client(worker.url).post do |request| request.url('/reduce_tasks') request.body = reduce_task.serialize end logger.debug(response.body) end ensure shuffled_local_outputs&.each do |output| output.delete end end