class ContainedMr::Runner

Handles running a single mapper or reducer.

Public Class Methods

new(container_options, time_limit, output_path) click to toggle source

Initialize a runner.

@param {Hash<String, Object>} container_options docker container creation

options, passed to {Docker::Container.create} without modification

@param {Number} time_limit maximum number of seconds that the runner’s

container is allowed to execute before being terminated

@param {String} output_path the location of the file inside the container

whose output will be saved
# File lib/contained_mr/runner.rb, line 18
def initialize(container_options, time_limit, output_path)
  @_container_options = container_options
  @_time_limit = time_limit
  @_output_path = output_path

  @container_id = nil
  @started_at = @ended_at = nil
  @status_code = nil
  @timed_out = nil
  @stdout = @stderr = nil
  @output = nil
end

Public Instance Methods

destroy!(container = nil) click to toggle source

Removes the container used to run a mapper / reducer.

@param {Docker::Container} container the mapper / reducer’s container; if

not supplied, an extra Docker API query is performed to obtain the
container object

@return {ContainedMr::Runner} self

# File lib/contained_mr/runner.rb, line 50
def destroy!(container = nil)
  unless @container_id.nil?
    container ||= Docker::Container.get @container_id
    begin
      container.delete force: true
    rescue Docker::Error::NotFoundError
      # Workaround for https://github.com/docker/docker/issues/14474
    end
    @container_id = nil
  end
  self
end
perform() click to toggle source

Performs a full mapper / reducer step.

@return {ContainedMr::Runner} self

# File lib/contained_mr/runner.rb, line 34
def perform
  container = create
  @container_id = container.id

  execute container
  fetch_console_output container
  fetch_file_output container
  destroy! container
end

Private Instance Methods

create() click to toggle source

Creates a container for running a mapper or reducer.

@return {Docker::Container} newly created container

# File lib/contained_mr/runner.rb, line 66
def create
  Docker::Container.create @_container_options
end
execute(container) click to toggle source

Runs the process inside the container, kills it if takes too long.

@param {Docker::Container} container the container that holds the process

# File lib/contained_mr/runner.rb, line 74
def execute(container)
  container.start
  @started_at = Time.now
  begin
    wait_status = container.wait @_time_limit
    @status_code = wait_status['StatusCode']
    @timed_out = false
  rescue Docker::Error::TimeoutError
    @status_code = false
    @timed_out = true
    container.kill
  end
  @ended_at = Time.now
end
fetch_console_output(container) click to toggle source

Extracts console output from a container.

@param {Docker::Container} container the mapper / reducer’s container

# File lib/contained_mr/runner.rb, line 93
def fetch_console_output(container)
  stdout_buffer = []
  stderr_buffer = []
  container.streaming_logs stdout: true, stderr: true,
      since: 0, timestamps: false, tail: false do |stream, chunk|
    case stream
    when :stdout
      stdout_buffer << chunk
    when :stderr
      stderr_buffer << chunk
    end
  end

  @stdout = stdout_buffer.join ''
  @stderr = stderr_buffer.join ''

  # NOTE: The method below is simpler, but hangs on Swarm.
  #       https://github.com/docker/swarm/issues/1284
  #
  # messages = container.attach stream: false, logs: true, stdin: nil,
  #                             stdout: true, stderr: true
  # @stdout = messages[0].join ''
  # @stderr = messages[1].join ''
end
fetch_file_output(container) click to toggle source

Extracts the mapper / reducer’s output file from a container.

@param {Docker::Container} container the mapper / reducer’s container

# File lib/contained_mr/runner.rb, line 122
def fetch_file_output(container)
  begin
    tar_buffer = fetch_tar_output container
  rescue Docker::Error::ServerError
    @output = false
    return
  end

  Gem::Package::TarReader.new tar_buffer do |tar|
    tar.each do |entry|
      next unless entry.file?
      @output = entry.read
      return
    end
  end
  @output = false
end
fetch_tar_output(container) click to toggle source

Extracts the mapper / reducer’s output, as a .tar, from a container.

@param {Docker::Container} container the mapper / reducer’s container @return {IO} an IO implementation that sources the .tar data

# File lib/contained_mr/runner.rb, line 145
def fetch_tar_output(container)
  tar_buffer = StringIO.new
  container.copy @_output_path do |data|
    tar_buffer << data
  end
  tar_buffer.rewind
  tar_buffer
end