class Rub2::Manager

Public Class Methods

new(name) click to toggle source
# File lib/rub2.rb, line 297
def initialize(name)
  @script = JobScript.new(name)
  @job_store = JobStore.new
  @timeout = 30
  @max_retry_count = 0
  @jobid = []
end

Public Instance Methods

array_request(req) click to toggle source
# File lib/rub2.rb, line 344
def array_request(req)
  @script.array_request = req
end
continue_on_error() click to toggle source
# File lib/rub2.rb, line 365
def continue_on_error
  @continue_on_error = true
end
dry_run() click to toggle source
# File lib/rub2.rb, line 353
def dry_run
  @dry_run = true
end
execute_with(first, *rest, &block) click to toggle source

example: execute_with array (, arrays) do |arg1 (, args…)|

return command_string

end

# File lib/rub2.rb, line 309
def execute_with(first, *rest, &block)
  commands = []
  first.zip(*rest) do |i|
    cmd = block.call(*i)
    commands.push cmd if cmd
  end
  @script.commands = commands
end
get_executed_command(job_id) click to toggle source

accessor

# File lib/rub2.rb, line 375
def get_executed_command(job_id)
  return @script.commands[job_id - 1]
end
inherit_environment() click to toggle source
# File lib/rub2.rb, line 357
def inherit_environment
  @script.inherit_environment = true
end
log(log_path) click to toggle source

job options

# File lib/rub2.rb, line 332
def log(log_path)
  @script.log_path = Pathname.new(log_path)
end
max_retry(count) click to toggle source
# File lib/rub2.rb, line 369
def max_retry(count)
  @max_retry_count = count
end
on_done(&block) click to toggle source

example: on_done {puts 'done'}

# File lib/rub2.rb, line 326
def on_done(&block)
  @done_proc = block
end
on_fail(&block) click to toggle source

example: on_fail {|results| p results}

# File lib/rub2.rb, line 321
def on_fail(&block)
  @fail_proc = block
end
queue(q) click to toggle source
# File lib/rub2.rb, line 361
def queue(q)
  @script.queue = q
end
resource(res = {}) click to toggle source
# File lib/rub2.rb, line 340
def resource(res = {})
  @script.resource = res
end
shell(intep) click to toggle source
# File lib/rub2.rb, line 336
def shell(intep)
  @script.shell = intep
end
slot_limit(limit) click to toggle source

slot limit doent't work on torque

# File lib/rub2.rb, line 349
def slot_limit(limit)
  @script.slot_limit = limit
end
submit() click to toggle source

exec qsub

# File lib/rub2.rb, line 382
def submit
  unless @dry_run
    @script.log_path.dirname.mkpath unless @script.log_path.dirname.exist?
    @script.uri = start_tuplespace
  end

  @script.build

  if @dry_run
    puts @script.source
    return
  end

  if @script.commands.empty?
    raise "Empty commands"
    return
  end

  @jobid << submit_qsub(@script.source)
  @job_store.init_job(@jobid.first, @script.array_request, @max_retry_count)
end
wait_finish() click to toggle source
# File lib/rub2.rb, line 404
def wait_finish
  return true if @dry_run

  results = polling_loop

  if results.all? {|aid, ret| ret == 0}
    if @done_proc
      @done_proc.call
    else
      Rub2.putlog "job succeeded"
    end
    return true
  end

  if @fail_proc
    @fail_proc.call(results)
  else
    results.each do |aid, ret|
      unless ret == 0
        Rub2.putlog "array job[#{aid}] failed: #{ret}"
      end
    end
  end

  return false if @continue_on_error
  Rub2.putlog "job failed: #{@jobid.join(',')}"
  exit false
end

Private Instance Methods

polling_loop() click to toggle source
# File lib/rub2.rb, line 435
def polling_loop
  job_result = JobResultCollector.new(@uri, @timeout, @job_store.job_count)
  dead_job = DeadJobCollector.new()

  until @job_store.all_finish?
    begin
      job_result.collect_job_result(@job_store) until @job_store.all_finish?
    rescue Rinda::RequestExpiredError
      # ignore timeout
    end
    dead_job.collect(@job_store)
    retry_job = @job_store.select_retry_jobs
    resubmit_faild_job(retry_job) unless retry_job.empty?
  end
  results = []
  @job_store.each_job do |job|
    results.push([job.array_id, job.exit_code])
  end
  return results
end
resubmit_faild_job(failed_jobs) click to toggle source
# File lib/rub2.rb, line 456
def resubmit_faild_job(failed_jobs)
  return if failed_jobs.empty?
  ids = failed_jobs.map {|job| job.array_id}.join(',')
  newjobid = submit_qsub(@script.source, ids)
  @jobid << newjobid
  failed_jobs.each {|job| job.set_resubmit_id(newjobid)}
end
start_tuplespace() click to toggle source

start server for job results

# File lib/rub2.rb, line 481
def start_tuplespace
  @ts = Rinda::TupleSpace.new
  @drb = DRb.start_service("druby://:0", @ts)
  @uri = @drb.uri
  Rub2.putlog "start Rinda Server: #{@uri}"
  return @uri
end
submit_qsub(script, array_option = nil) click to toggle source
# File lib/rub2.rb, line 464
def submit_qsub(script, array_option = nil)
  jobid = nil
  cmd = "qsub"
  cmd += " -t #{array_option}" if array_option
  Open3.popen3(cmd) do |stdin, stdout, stderr|
    stdin.puts(script)
    stdin.close
    jobid = stdout.read.chomp
    raise "qsub error: " + stderr.read.chomp if jobid.empty?
  end
  jobid =~ /\A(\d+)/
  jobid = $1
  Rub2.putlog "job submited: #{@script.name} -> #{jobid}[#{array_option || @script.make_array_request_string}]"
  return jobid
end