class ContainedMr::Job

A map-reduce job.

Public Class Methods

new(template, id, json_options) click to toggle source

@see {ContainedMr::TemplateLogic#new_job}

# File lib/contained_mr/job.rb, line 13
def initialize(template, id, json_options)
  @template = template
  @id = id
  @name_prefix = template.name_prefix
  @item_count = template.item_count

  @mapper_image_id = nil
  @reducer_image_id = nil

  @mappers = Array.new @item_count
  @reducer = nil
  @mapper_options = nil
  @reducer_options = nil
  parse_options json_options
end

Public Instance Methods

build_mapper_image(mapper_input) click to toggle source

Builds the Docker image used to run this job’s mappers.

@param {String} mapper_input data passed to the mappers @return {String} the newly built Docker image’s ID

# File lib/contained_mr/job.rb, line 62
def build_mapper_image(mapper_input)
  unless @mapper_image_id.nil?
    raise RuntimeError, 'Mapper image already exists'
  end

  tar_io = mapper_tar_context mapper_input
  Docker::Image.build_from_tar tar_io, mapper_image_options

  # NOTE: The build process returns a short image ID. We need to perform
  #       another API call to get the canonical ID.
  @mapper_image_id = Docker::Image.get(mapper_image_tag).id
end
build_reducer_image() click to toggle source

Builds the Docker image used to run this job’s reducer.

@return {String} the newly built Docker image’s ID

# File lib/contained_mr/job.rb, line 78
def build_reducer_image
  unless @reducer_image_id.nil?
    raise RuntimeError, 'Reducer image already exists'
  end
  1.upto @item_count do |i|
    raise RuntimeError, 'Not all mappers ran' if mapper_runner(i).nil?
  end

  tar_io = reducer_tar_context
  Docker::Image.build_from_tar tar_io, reducer_image_options

  # NOTE: The build process returns a short image ID. We need to perform
  #       another API call to get the canonical ID.
  @reducer_image_id = Docker::Image.get(reducer_image_tag).id
end
destroy!() click to toggle source

Tears down the job’s state.

This removes the job’s containers, as well as the mapper and reducer Docker images, if they still exist.

@return {ContainedMr::Job} self

# File lib/contained_mr/job.rb, line 35
def destroy!
  @mappers.each do |mapper|
    mapper.destroy! unless mapper.nil?
  end
  @reducer.destroy! unless @reducer.nil?

  unless @mapper_image_id.nil?
    # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag.
    image = Docker::Image.new Docker.connection, 'id' => mapper_image_tag
    image.remove
    @mapper_image_id = nil
  end

  unless @reducer_image_id.nil?
    # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag.
    image = Docker::Image.new Docker.connection, 'id' => reducer_image_tag
    image.remove
    @reducer_image_id = nil
  end

  self
end
reducer_tar_context() click to toggle source

Builds the .tar context used to create the mapper’s Docker image.

@return {IO} an IO implementation that sources the .tar file

# File lib/contained_mr/job.rb, line 128
def reducer_tar_context
  tar_buffer = StringIO.new
  Gem::Package::TarWriter.new tar_buffer do |tar|
    tar.add_file 'Dockerfile', 0644 do |docker_io|
      docker_io.write @template.reducer_dockerfile
    end
    @mappers.each_with_index do |mapper, index|
      i = index + 1

      if mapper.output
        tar.add_file "#{i}.out", 0644 do |io|
          io.write mapper.output
        end
      end
      tar.add_file("#{i}.stdout", 0644) { |io| io.write mapper.stdout }
      tar.add_file("#{i}.stderr", 0644) { |io| io.write mapper.stderr }

      tar.add_file("#{i}.json", 0644) do |io|
        io.write mapper.json_file.to_json
      end
    end
  end
  tar_buffer.rewind
  tar_buffer
end
run_mapper(i) click to toggle source

Runs one of the job’s mappers.

@param {Number} i the mapper to run @return {ContainedMr::Runner} the runner used by the mapper

# File lib/contained_mr/job.rb, line 98
def run_mapper(i)
  if i < 1 || i > @item_count
    raise ArgumentError, "Invalid mapper number #{i}"
  end
  raise RuntimeError, 'Mapper image does not exist' if @mapper_image_id.nil?

  mapper = ContainedMr::Runner.new mapper_container_options(i),
      @mapper_options[:wait_time], @template.mapper_output_path
  @mappers[i - 1] = mapper
  mapper.perform
end
run_reducer() click to toggle source

Runs one the job’s reducer.

@return {ContainedMr::Runner} the runner used by the reducer

# File lib/contained_mr/job.rb, line 113
def run_reducer
  if @reducer_image_id.nil?
    raise RuntimeError, 'Reducer image does not exist'
  end

  reducer = ContainedMr::Runner.new reducer_container_options,
      @reducer_options[:wait_time], @template.reducer_output_path
  @reducer = reducer
  @reducer.perform
end