class ContainedMr::Runner
Handles running a single mapper or reducer.
Public Class Methods
Initialize a runner.
@param {Hash<String, Object>} container_options docker container creation
options, passed to {Docker::Container.create} without modification
@param {Number} time_limit maximum number of seconds that the runner’s
container is allowed to execute before being terminated
@param {String} output_path the location of the file inside the container
whose output will be saved
# File lib/contained_mr/runner.rb, line 18 def initialize(container_options, time_limit, output_path) @_container_options = container_options @_time_limit = time_limit @_output_path = output_path @container_id = nil @started_at = @ended_at = nil @status_code = nil @timed_out = nil @stdout = @stderr = nil @output = nil end
Public Instance Methods
Removes the container used to run a mapper / reducer.
@param {Docker::Container} container the mapper / reducer’s container; if
not supplied, an extra Docker API query is performed to obtain the container object
@return {ContainedMr::Runner} self
# File lib/contained_mr/runner.rb, line 50 def destroy!(container = nil) unless @container_id.nil? container ||= Docker::Container.get @container_id begin container.delete force: true rescue Docker::Error::NotFoundError # Workaround for https://github.com/docker/docker/issues/14474 end @container_id = nil end self end
Performs a full mapper / reducer step.
@return {ContainedMr::Runner} self
# File lib/contained_mr/runner.rb, line 34 def perform container = create @container_id = container.id execute container fetch_console_output container fetch_file_output container destroy! container end
Private Instance Methods
Creates a container for running a mapper or reducer.
@return {Docker::Container} newly created container
# File lib/contained_mr/runner.rb, line 66 def create Docker::Container.create @_container_options end
Runs the process inside the container, kills it if takes too long.
@param {Docker::Container} container the container that holds the process
# File lib/contained_mr/runner.rb, line 74 def execute(container) container.start @started_at = Time.now begin wait_status = container.wait @_time_limit @status_code = wait_status['StatusCode'] @timed_out = false rescue Docker::Error::TimeoutError @status_code = false @timed_out = true container.kill end @ended_at = Time.now end
Extracts console output from a container.
@param {Docker::Container} container the mapper / reducer’s container
# File lib/contained_mr/runner.rb, line 93 def fetch_console_output(container) stdout_buffer = [] stderr_buffer = [] container.streaming_logs stdout: true, stderr: true, since: 0, timestamps: false, tail: false do |stream, chunk| case stream when :stdout stdout_buffer << chunk when :stderr stderr_buffer << chunk end end @stdout = stdout_buffer.join '' @stderr = stderr_buffer.join '' # NOTE: The method below is simpler, but hangs on Swarm. # https://github.com/docker/swarm/issues/1284 # # messages = container.attach stream: false, logs: true, stdin: nil, # stdout: true, stderr: true # @stdout = messages[0].join '' # @stderr = messages[1].join '' end
Extracts the mapper / reducer’s output file from a container.
@param {Docker::Container} container the mapper / reducer’s container
# File lib/contained_mr/runner.rb, line 122 def fetch_file_output(container) begin tar_buffer = fetch_tar_output container rescue Docker::Error::ServerError @output = false return end Gem::Package::TarReader.new tar_buffer do |tar| tar.each do |entry| next unless entry.file? @output = entry.read return end end @output = false end
Extracts the mapper / reducer’s output, as a .tar, from a container.
@param {Docker::Container} container the mapper / reducer’s container @return {IO} an IO implementation that sources the .tar data
# File lib/contained_mr/runner.rb, line 145 def fetch_tar_output(container) tar_buffer = StringIO.new container.copy @_output_path do |data| tar_buffer << data end tar_buffer.rewind tar_buffer end