module ContainedMr::JobLogic
Logic shared by {ContainedMr::Job} and {ContainedMr::Mock::Job}.
Attributes
@return {String} the job’s unique identifier
@return {Number} the number of mapper jobs that will be run
@return {String} the unique ID of the Docker image used to run the mappers
@return {String} prepended to Docker objects, for identification purposes
@return {String} the unique ID of the Docker image used to run the reducer
@return {ContainedMr::Template} the template this job is derived from
Public Instance Methods
@return {Hash<String, Object>} params used to create a mapper container
# File lib/contained_mr/job_logic.rb, line 71 def mapper_container_options(i) ulimits = @mapper_options[:ulimits].map do |k, v| { "Name" => k.to_s, "Soft" => v, "Hard" => v } end env = @template.mapper_env i env.push "affinity:image==#{mapper_image_tag}" { 'name' => "#{@name_prefix}_mapper.#{@id}.#{i}", 'Image' => mapper_image_tag, 'Hostname' => "#{i}.mapper", 'Domainname' => '', 'Labels' => { 'contained_mr.ctl' => @name_prefix }, 'Env' => env, 'Ulimits' => ulimits, 'NetworkDisabled' => true, 'ExposedPorts' => {}, 'HostConfig' => container_host_config(@mapper_options), } end
@return {Hash<Symbol, Object>} options passed to the Docker API when
building when building the mapper image
# File lib/contained_mr/job_logic.rb, line 54 def mapper_image_options { t: mapper_image_tag, forcerm: 1, buildargs: { 'affinity:image' => "=#{@template.image_tag}" }.to_json } end
@return {String} tag applied to the Docker image used by the job’s mappers
# File lib/contained_mr/job_logic.rb, line 43 def mapper_image_tag "#{@name_prefix}/mapper.#{@id}" end
Returns the runner used for a mapper.
@param {Number} i the mapper number @return {ContainedMr::Runner} the runner used for the given mapper; nil if
the given mapper was not started
# File lib/contained_mr/job_logic.rb, line 26 def mapper_runner(i) if i < 1 || i > @item_count raise ArgumentError, "Invalid mapper number #{i}" end @mappers[i - 1] end
@return {Hash<String, Object>} params used to create a reducer container
# File lib/contained_mr/job_logic.rb, line 91 def reducer_container_options ulimits = @reducer_options[:ulimits].map do |k, v| { "Name" => k.to_s, "Soft" => v, "Hard" => v } end env = @template.reducer_env env.push "affinity:image==#{reducer_image_tag}" { 'name' => "#{@name_prefix}_reducer.#{@id}", 'Image' => reducer_image_tag, 'Hostname' => 'reducer', 'Domainname' => '', 'Labels' => { 'contained_mr.ctl' => @name_prefix }, 'Env' => env, 'Ulimits' => ulimits, 'NetworkDisabled' => true, 'ExposedPorts' => {}, 'HostConfig' => container_host_config(@reducer_options), } end
@return {Hash<Symbol, Object>} options passed to the Docker API when
building when building the mapper image
# File lib/contained_mr/job_logic.rb, line 63 def reducer_image_options { t: reducer_image_tag, forcerm: 1, buildargs: { 'affinity:image' => "=#{@template.image_tag}" }.to_json } end
@return {String} tag applied to the Docker image used by the job’s reducers
# File lib/contained_mr/job_logic.rb, line 48 def reducer_image_tag "#{@name_prefix}/reducer.#{@id}" end
Returns the runner used for the reducer.
@return {ContainedMr::Runner} the runner used for reducer; nil if the
reducer was not started
# File lib/contained_mr/job_logic.rb, line 37 def reducer_runner @reducer end
Private Instance Methods
Computes the value of the HostConfig key in container creation params.
@param {Hash<Symbol, Object>} job_section the “mapper” or “reducer” section
in the options
@return {Hash<String, Object>} a container’s HostConfig params
# File lib/contained_mr/job_logic.rb, line 115 def container_host_config(job_section) ram_bytes = (job_section[:ram] * 1048576).to_i swap_bytes = (job_section[:swap] * 1048576).to_i + ram_bytes # NOTE: The value below is 1 second, in microsecodns. This is the maximum # value, and it minimizes scheduling overheads, at the expense of # precision. cpu_period = 1_000_000 { 'Memory' => ram_bytes, 'MemorySwap' => swap_bytes, 'MemorySwappiness' => 0, 'CpuPeriod' => cpu_period, 'CpuQuota' => (job_section[:vcpus] * cpu_period).to_i, # NOTE: This interpretation of CpuShares only works on Docker Swarm. 'CpuShares' => job_section[:vcpus], 'LogConfig' => { 'Type' => 'json-file', 'Config' => { 'max-size' => (job_section[:logs] * 1048576).to_i.to_s, 'max-file' => '1', }, }, } end
Builds the .tar context used to create the mapper’s Docker image.
@param {String} mapper_input data passed to the mappers @return {IO} an IO implementation that sources the .tar data
# File lib/contained_mr/job_logic.rb, line 176 def mapper_tar_context(mapper_input) tar_buffer = StringIO.new Gem::Package::TarWriter.new tar_buffer do |tar| tar.add_file 'Dockerfile', 0644 do |docker_io| docker_io.write @template.mapper_dockerfile end tar.add_file 'input', 0644 do |input_io| input_io.write mapper_input end end tar_buffer.rewind tar_buffer end
Reads in JSON options and sets defaults.
# File lib/contained_mr/job_logic.rb, line 143 def parse_options(json_options) mapper = json_options['mapper'] || {} mapper_ulimits = mapper['ulimits'] || {} @mapper_options = { wait_time: mapper['wait_time'] || 60, vcpus: mapper['vcpus'] || 1, # logical processors ram: mapper['ram'] || 512, # megabytes swap: mapper['swap'] || 0, # megabytes logs: mapper['logs'] || 64, # megabytes ulimits: { cpu: mapper_ulimits['cpu'] || 60, # seconds } } reducer = json_options['reducer'] || {} reducer_ulimits = reducer['ulimits'] || {} @reducer_options = { wait_time: reducer['wait_time'] || 60, vcpus: reducer['vcpus'] || 1, # logical processors ram: reducer['ram'] || 512, # megabytes swap: reducer['swap'] || 0, # megabytes logs: reducer['logs'] || 64, # megabytes ulimits: { cpu: reducer_ulimits['cpu'] || 60, } } end