class SimpleMapReduce::Worker::RunMapTaskWorker

Constants

HTTP_JSON_HEADER

Public Instance Methods

perform(job, map_worker) click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 8
def perform(job, map_worker)
  task_wrapper_class_name = "TaskWrapper#{job.id.delete('-')}"
  self.class.class_eval("class #{task_wrapper_class_name}; end", 'Task Wrapper Class')
  task_wrapper_class = self.class.const_get(task_wrapper_class_name)
  task_wrapper_class.class_eval(job.map_script, 'Map task script')
  map_task = task_wrapper_class.const_get(job.map_class_name, false).new
  unless map_task.respond_to?(:map)
    raise InvalidMapTaskError, 'no map method'
  end

  logger.info('map task start')

  local_input_cache = Tempfile.new
  s3_client.get_object(
    response_target: local_input_cache.path,
    bucket: job.job_input_bucket_name,
    key: job.job_input_directory_path
  )
  local_input_cache.rewind

  local_output_cache = Tempfile.new
  local_input_cache.each_line(chomp: true, rs: "\n") do |line|
    map_task.map(line, local_output_cache)
  end

  local_output_cache.rewind
  logger.debug("output data size: #{local_output_cache.size}")
  logger.debug('---map output digest---')
  local_output_cache.take(5).each do |line|
    logger.debug(line)
  end
  logger.debug('---map output digest---')

  response = http_client(SimpleMapReduce.job_tracker_url).post do |request|
    request.url('/workers/reserve')
    # TODO: providing a way to specify worker_size
    request.body = { worker_size: 2 }.to_json
  end
  logger.debug(response.body)

  # {"succeeded":true,"workers":[{"id":70157882164440,"url":"http://localhost:4569","state":'reserved'}]}
  reserved_workers = JSON.parse(response.body, symbolize_names: true)[:reserved_workers].map do |worker|
    SimpleMapReduce::Server::Worker.new(
      id: worker[:id],
      url: worker[:url],
      state: worker[:state].to_sym,
      data_store_type: 'remote'
    )
  end
  if reserved_workers.count == 0
    # keep working with same worker
    reserved_workers << map_worker
  end

  shuffle(job, reserved_workers, local_output_cache)

  if reserved_workers.all? { |w| w.id != map_worker.id }
    begin
      map_worker.ready!
    rescue => notify_error
      logger.fatal(notify_error.inspect)
      logger.fatal(notify_error.backtrace.take(50))
    end
  end
rescue => e
  logger.error(e.inspect)
  logger.error(e.backtrace.take(50))
  job.failed!
  # TODO: notifying to job_tracker that this task have failed
ensure
  local_input_cache&.delete
  local_output_cache&.delete
  if self.class.const_defined?(task_wrapper_class_name.to_sym)
    self.class.send(:remove_const, task_wrapper_class_name.to_sym)
  end
  logger.info('map task end')
end

Private Instance Methods

http_client(url) click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 101
def http_client(url)
  ::Faraday.new(
    url: url,
    headers: HTTP_JSON_HEADER
  ) do |faraday|
    faraday.response :logger
    faraday.response :raise_error
    faraday.adapter  Faraday.default_adapter
  end
end
logger() click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 92
def logger
  SimpleMapReduce.logger
end
s3_client() click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 88
def s3_client
  SimpleMapReduce::S3Client.instance.client
end
shuffle(job, workers, local_output_cache) click to toggle source
# File lib/simple_map_reduce/worker/run_map_task_worker.rb, line 112
def shuffle(job, workers, local_output_cache)
  workers_count = workers.count
  raise 'No workers' unless workers_count > 0

  shuffled_local_outputs = Array.new(workers_count)
  shuffled_local_outputs.each_with_index { |_, i| shuffled_local_outputs[i] = Tempfile.new }
  local_output_cache.each_line(rs: "\n") do |raw_line|
    output = JSON.parse(raw_line, symbolize_names: true)
    partition_id = output[:key].hash % workers_count
    shuffled_local_outputs[partition_id].puts(output.to_json)
  end

  task_script = job.reduce_script
  task_class_name = job.reduce_class_name
  task_input_bucket_name = SimpleMapReduce.s3_intermediate_bucket_name
  task_output_bucket_name = job.job_output_bucket_name
  task_output_directory_path = job.job_output_directory_path
  task_input_file_path_prefix = "#{job.id}/map_output_#{Time.now.to_i}/"

  workers.each_with_index do |worker, partition_id|
    reduce_task = ::SimpleMapReduce::Server::Task.new(
      job_id: job.id,
      task_class_name: task_class_name,
      task_script: task_script,
      task_input_bucket_name: task_input_bucket_name,
      task_input_file_path: "#{task_input_file_path_prefix}#{partition_id}_map_output.txt",
      task_output_bucket_name: task_output_bucket_name,
      task_output_directory_path: task_output_directory_path
    )

    local_output_cache = shuffled_local_outputs[partition_id]
    local_output_cache.rewind
    s3_client.put_object(
      body: local_output_cache.read,
      bucket: reduce_task.task_input_bucket_name,
      key: reduce_task.task_input_file_path
    )

    response = http_client(worker.url).post do |request|
      request.url('/reduce_tasks')
      request.body = reduce_task.serialize
    end
    logger.debug(response.body)
  end
ensure
  shuffled_local_outputs&.each do |output|
    output.delete
  end
end