class Bio::Pipengine::Job
Attributes
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
a Job
object holds information on a job to be submitted samples_groups and samples_obj
are used to store information in case of steps that require to combine info from multiple samples
Public Class Methods
# File lib/bio/pipengine/job.rb, line 15 def initialize(name) @name = generate_uuid + "-" + name @shortname = name @command_line = [] @resources = {} @cpus = 1 @nodes = "1" @log = "stdin" @log_adapter = nil end
Public Instance Methods
# File lib/bio/pipengine/job.rb, line 26 def add_resources(resources) self.resources.merge! resources end
add all the command lines for a given step
# File lib/bio/pipengine/job.rb, line 35 def add_step(step,sample) # setting job working directory working_dir = "" if self.local working_dir = self.local+"/"+self.name else working_dir = self.output if step.is_multi? folder = (self.custom_output) ? self.custom_output : @shortname working_dir += "/#{folder}" else folder = if self.custom_output self.custom_output elsif self.custom_name self.custom_name else step.name end working_dir += "/#{sample.name}/#{folder}" end end # set job cpus number to the higher step cpus (this in case of multiple steps) self.cpus = step.cpus if step.cpus > self.cpus # set number of nodes for job self.nodes = (step.nodes) ? step.nodes : @nodes # set the memory used self.mem = step.mem # adding job working directory unless step.name.start_with? "_" self.command_line << "if [ ! -f #{working_dir}/checkpoint ]" self.command_line << "then" self.command_line << logger(step, "start") self.command_line << "\nmkdir -p #{working_dir}" self.command_line << "cd #{working_dir}" end # generate command lines for this step if step.run.kind_of? Array step.run.each_with_index do |cmd, i| command = generate_cmd_line(cmd,sample,step) # TODO verify that logger works in this case # self.command_line << "#{command} || { echo \"FAILED `date`: #{step.name}:#{i}\" ; exit 1; }" self.command_line << "#{command} || { #{logger(step, "FAILED #{i}" )}; exit 1; }" end else command = generate_cmd_line(step.run,sample,step) # TODO verify that logger works in this case # self.command_line << "#{command} || { echo \"FAILED `date`: #{step.name} \" ; exit 1; }" self.command_line << "#{command} || { #{logger(step, "FAILED" )}; exit 1; }" end self.command_line << logger(step, "finished") self.command_line << "touch #{working_dir}/checkpoint" self.command_line << "else" self.command_line << logger(step, "already executed, skipping this step") self.command_line << "fi" # check if a temporary (i.e. different from 'output') directory is set if self.local final_output = "" if step.is_multi? folder = (self.custom_output) ? self.custom_output : @shortname final_output = self.output+"/#{folder}" else folder = (self.custom_output) ? self.custom_output : step.name final_output = self.output+"/#{sample.name}/#{folder}" end self.command_line << "mkdir -p #{final_output}" self.command_line << "cp -r #{working_dir}/* #{final_output}" self.command_line << "rm -fr #{working_dir}" end end
# File lib/bio/pipengine/job.rb, line 30 def output self.resources["output"] end
# File lib/bio/pipengine/job.rb, line 136 def submit job_id = `qsub #{self.output}/#{self.name}.pbs` @@logger.info "#{job_id}".green end
# File lib/bio/pipengine/job.rb, line 118 def to_script(options) File.open(self.output+"/"+self.name+'.pbs','w') do |file| file.puts "#!/usr/bin/env bash" file.puts "#PBS -N #{self.name}" file.puts "#PBS -d #{self.output}" file.puts "#PBS -q #{options[:pbs_queue]}" if options[:pbs_queue] if options[:pbs_opts] file.puts "#PBS -l #{options[:pbs_opts].join(",")}" else l_string = [] l_string << "nodes=#{self.nodes}:ppn=#{self.cpus}" l_string << "mem=#{self.mem}" if self.mem file.puts "#PBS -l #{l_string.join(",")}" end file.puts self.command_line.join("\n") end end
Private Instance Methods
this method call other methods to perform the right substitutions into the command lines
# File lib/bio/pipengine/job.rb, line 149 def generate_cmd_line(cmd,sample,step) if step.is_multi? # if is a multi samples step call a different method set_multi_cmd(step,self.multi_samples) cmd = sub_multi(cmd,step) else cmd = sub_placeholders(cmd,sample,step) # normal step, perform usual substitutions end # Check that all placeholders have been substituted, if not terminate with an error cmd.scan(/<\S+>/).each do |unsubstituted_tag| @@logger_error.error("Found an unsubstituted tag #{unsubstituted_tag} . Terminating the execution".red) exit end return cmd end
this sub method handle different multi-samples definitions (like comma separated list, space separated etc.)
# File lib/bio/pipengine/job.rb, line 232 def generate_multi_cmd(multi_def,multi_samples) multi_cmd = [] multi_samples.each do |sample_name| if sample_name.include? "," multi_cmd << split_and_sub(",",multi_def,sample_name) elsif sample_name.include? ";" multi_cmd << split_and_sub(";",multi_def,sample_name) else multi_cmd << sub_placeholders(multi_def,self.samples_obj[sample_name]) end end return multi_cmd.join("\s") end
create a unique ID for each job
# File lib/bio/pipengine/job.rb, line 144 def generate_uuid SecureRandom.hex(5) end
log a step according to the selected adapter
# File lib/bio/pipengine/job.rb, line 256 def logger(step, message) case self.log when "stdin" "echo \"#{step.name} #{name} #{message} `whoami` `hostname` `pwd` `date`.\"" when "syslog" "logger -t PIPENGINE \"#{step.name} #{name} #{message} `whoami` `hostname` `pwd`\"" when "fluentd" "curl -X POST -d 'json={\"source\":\"PIPENGINE\", \"step\":\"#{step.name}\", \"message\":\"#{message}\", \"job_id\":\"#{name}\", \"user\":\"\'\"`whoami`\"\'\", \"host\":\"\'\"`hostname`\"\'\", \"pwd\":\"\'\"`pwd`\"\'\"}' #{self.log_adapter}" end end
creates actual multi-samples command lines to be substituted where <multi> placeholders are found
# File lib/bio/pipengine/job.rb, line 207 def set_multi_cmd(step,multi_samples) if step.multi_def.kind_of? Array # in case of many multi-samples command lines step.multi_cmd = [] step.multi_def.each do |m_def| step.multi_cmd << generate_multi_cmd(m_def,multi_samples) end else step.multi_cmd = generate_multi_cmd(step.multi_def,multi_samples) end end
take a non-space separated list of samples and perform the substitution with the group defitions
# File lib/bio/pipengine/job.rb, line 247 def split_and_sub(sep,multi_def,multi) cmd_line = [] multi.split(sep).each do |sample_name| cmd_line << sub_placeholders(multi_def,self.samples_obj[sample_name]) end cmd_line.join(sep) end
take the multi_cmd and perform the subsitutions into the step command lines
# File lib/bio/pipengine/job.rb, line 219 def sub_multi(cmd,step) cmd = sub_resources_and_cpu(cmd,step) if step.multi_cmd.kind_of? Array step.multi_cmd.each_with_index do |m,index| cmd.gsub!(/<multi#{index+1}>/,m) end else cmd.gsub!(/<multi>/,step.multi_cmd) end return cmd end
perform substitutions on all the placeholders
# File lib/bio/pipengine/job.rb, line 166 def sub_placeholders(cmd,sample,step=nil) tmp_cmd = cmd.gsub(/<sample>/,sample.name) if tmp_cmd =~/<sample_path>/ sample_path_glob = (tmp_cmd.scan(/<sample_path>(\S+)/).map {|e| e.first}) if sample_path_glob.empty? tmp_cmd.gsub!(/<sample_path>/,sample.path.join("\s")) else sample_path_glob.each do |append| tmp_cmd.gsub!(/<sample_path>#{Regexp.quote(append)}/,(sample.path.map {|s| s+append}).join("\s")) end end end # for resourcers and cpus tmp_cmd = sub_resources_and_cpu(tmp_cmd,step) # for placeholders like <mapping/sample> tmp_cmd.scan(/<(\S+)\/sample>/).map {|e| e.first}.each do |input_folder| @@logger.info "Directory #{self.output+"/"+sample.name+"/"+input_folder} not found".magenta unless Dir.exists? self.output+"/"+sample.name+"/"+input_folder tmp_cmd = tmp_cmd.gsub(/<#{input_folder}\/sample>/,self.output+"/"+sample.name+"/"+input_folder+"/"+sample.name) end # for placeholders like <mapping/> tmp_cmd.scan(/<(\S+)\/>/).map {|e| e.first}.each do |input_folder| @@logger.info "Directory #{self.output+"/"+sample.name+"/"+input_folder} not found".magenta unless Dir.exists? self.output+"/"+sample.name+"/"+input_folder tmp_cmd = tmp_cmd.gsub(/<#{input_folder}\/>/,self.output+"/"+sample.name+"/"+input_folder+"/") end return tmp_cmd end
# File lib/bio/pipengine/job.rb, line 195 def sub_resources_and_cpu(cmd,step) # for all resources tags like <gtf> <index> <genome> <bwa> etc. self.resources.each_key do |r| cmd.gsub!(/<#{r}>/,self.resources[r]) end # set number of cpus for this command line cmd.gsub!(/<cpu>/,step.cpus.to_s) unless step.nil? return cmd end