class Rub2::Manager
Public Class Methods
new(name)
click to toggle source
# File lib/rub2.rb, line 297 def initialize(name) @script = JobScript.new(name) @job_store = JobStore.new @timeout = 30 @max_retry_count = 0 @jobid = [] end
Public Instance Methods
array_request(req)
click to toggle source
# File lib/rub2.rb, line 344 def array_request(req) @script.array_request = req end
continue_on_error()
click to toggle source
# File lib/rub2.rb, line 365 def continue_on_error @continue_on_error = true end
dry_run()
click to toggle source
# File lib/rub2.rb, line 353 def dry_run @dry_run = true end
execute_with(first, *rest, &block)
click to toggle source
example: execute_with
array (, arrays) do |arg1 (, argsā¦)|
return command_string
end
# File lib/rub2.rb, line 309 def execute_with(first, *rest, &block) commands = [] first.zip(*rest) do |i| cmd = block.call(*i) commands.push cmd if cmd end @script.commands = commands end
get_executed_command(job_id)
click to toggle source
accessor
# File lib/rub2.rb, line 375 def get_executed_command(job_id) return @script.commands[job_id - 1] end
inherit_environment()
click to toggle source
# File lib/rub2.rb, line 357 def inherit_environment @script.inherit_environment = true end
log(log_path)
click to toggle source
job options
# File lib/rub2.rb, line 332 def log(log_path) @script.log_path = Pathname.new(log_path) end
max_retry(count)
click to toggle source
# File lib/rub2.rb, line 369 def max_retry(count) @max_retry_count = count end
on_done(&block)
click to toggle source
example: on_done
{puts 'done'}
# File lib/rub2.rb, line 326 def on_done(&block) @done_proc = block end
on_fail(&block)
click to toggle source
example: on_fail
{|results| p results}
# File lib/rub2.rb, line 321 def on_fail(&block) @fail_proc = block end
queue(q)
click to toggle source
# File lib/rub2.rb, line 361 def queue(q) @script.queue = q end
resource(res = {})
click to toggle source
# File lib/rub2.rb, line 340 def resource(res = {}) @script.resource = res end
shell(intep)
click to toggle source
# File lib/rub2.rb, line 336 def shell(intep) @script.shell = intep end
slot_limit(limit)
click to toggle source
slot limit doent't work on torque
# File lib/rub2.rb, line 349 def slot_limit(limit) @script.slot_limit = limit end
submit()
click to toggle source
exec qsub
# File lib/rub2.rb, line 382 def submit unless @dry_run @script.log_path.dirname.mkpath unless @script.log_path.dirname.exist? @script.uri = start_tuplespace end @script.build if @dry_run puts @script.source return end if @script.commands.empty? raise "Empty commands" return end @jobid << submit_qsub(@script.source) @job_store.init_job(@jobid.first, @script.array_request, @max_retry_count) end
wait_finish()
click to toggle source
# File lib/rub2.rb, line 404 def wait_finish return true if @dry_run results = polling_loop if results.all? {|aid, ret| ret == 0} if @done_proc @done_proc.call else Rub2.putlog "job succeeded" end return true end if @fail_proc @fail_proc.call(results) else results.each do |aid, ret| unless ret == 0 Rub2.putlog "array job[#{aid}] failed: #{ret}" end end end return false if @continue_on_error Rub2.putlog "job failed: #{@jobid.join(',')}" exit false end
Private Instance Methods
polling_loop()
click to toggle source
# File lib/rub2.rb, line 435 def polling_loop job_result = JobResultCollector.new(@uri, @timeout, @job_store.job_count) dead_job = DeadJobCollector.new() until @job_store.all_finish? begin job_result.collect_job_result(@job_store) until @job_store.all_finish? rescue Rinda::RequestExpiredError # ignore timeout end dead_job.collect(@job_store) retry_job = @job_store.select_retry_jobs resubmit_faild_job(retry_job) unless retry_job.empty? end results = [] @job_store.each_job do |job| results.push([job.array_id, job.exit_code]) end return results end
resubmit_faild_job(failed_jobs)
click to toggle source
# File lib/rub2.rb, line 456 def resubmit_faild_job(failed_jobs) return if failed_jobs.empty? ids = failed_jobs.map {|job| job.array_id}.join(',') newjobid = submit_qsub(@script.source, ids) @jobid << newjobid failed_jobs.each {|job| job.set_resubmit_id(newjobid)} end
start_tuplespace()
click to toggle source
start server for job results
# File lib/rub2.rb, line 481 def start_tuplespace @ts = Rinda::TupleSpace.new @drb = DRb.start_service("druby://:0", @ts) @uri = @drb.uri Rub2.putlog "start Rinda Server: #{@uri}" return @uri end
submit_qsub(script, array_option = nil)
click to toggle source
# File lib/rub2.rb, line 464 def submit_qsub(script, array_option = nil) jobid = nil cmd = "qsub" cmd += " -t #{array_option}" if array_option Open3.popen3(cmd) do |stdin, stdout, stderr| stdin.puts(script) stdin.close jobid = stdout.read.chomp raise "qsub error: " + stderr.read.chomp if jobid.empty? end jobid =~ /\A(\d+)/ jobid = $1 Rub2.putlog "job submited: #{@script.name} -> #{jobid}[#{array_option || @script.make_array_request_string}]" return jobid end