class DakeExecutor
Public Class Methods
new(analyzer, dake_db, dep_graph, jobs)
click to toggle source
# File lib/dake/executor.rb, line 5 def initialize(analyzer, dake_db, dep_graph, jobs) @analyzer = analyzer @dake_db = dake_db @dep_graph = dep_graph @complete_dep_steps = Hash.new(0) @async = (jobs ? true : false) @pool = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: jobs, max_queue: 0 # unbounded work queue ) if @async end
Public Instance Methods
execute(rebuild_set, dry_run=false, log=false)
click to toggle source
# File lib/dake/executor.rb, line 18 def execute(rebuild_set, dry_run=false, log=false) if rebuild_set.empty? STDERR.puts "Nothing to be done.".colorize(:green) return end if @async dep_map = Hash.new rebuild_set.each do |step| dep_set = @dep_graph.dep_step[step] next if dep_set.empty? dep_map[step] = dep_set & rebuild_set end queue = Queue.new error_queue = Queue.new error_steps = Set.new error_thr = Thread.new do while error = error_queue.deq if error.is_a? Exception STDERR.puts "#{error.class}: #{error.message}".colorize(:red) STDERR.puts "Continue to execute other Step(s)".colorize(:red) STDERR.puts "To Force Quitting: Press Ctrl + C".colorize(:red) end end end lock = Concurrent::ReadWriteLock.new rebuild_set.each { |step| queue << step if @dep_graph.dep_step[step].all? { |dep| not rebuild_set.include? dep} } while next_step = queue.deq @pool.post(next_step) do |step| begin lock.acquire_read_lock error_step = error_steps.include? step lock.release_read_lock if error_step line, column = @analyzer.step_line_and_column step msg = "Step(#{step.object_id}) defined in #{step.src_file} at #{line}:#{column} " + "skipped due to prerequisite step(s) error." error_queue << Exception.new(msg) else execute_step(step, dry_run, log) end lock.acquire_write_lock dep_map.delete step if dep_map.empty? queue.close else @dep_graph.succ_step[step].each do |succ| next unless dep_map[succ] dep_map[succ].delete step if dep_map[succ].empty? queue << succ elsif dep_map[succ].all? { |dep_step| error_steps.include? dep_step } error_steps << succ queue << succ end end end lock.release_write_lock rescue Exception => e error_queue << e lock.acquire_write_lock error_steps << step dep_map.delete step if dep_map.empty? queue.close else @dep_graph.succ_step[step].each do |succ| if dep_map[succ].all? { |dep_step| error_steps.include? dep_step } error_steps << succ queue << succ end end end lock.release_write_lock end end end @pool.shutdown @pool.wait_for_termination queue.close error_queue.close error_thr.join raise "Failed to execute some step(s)" unless error_steps.empty? else @dep_graph.step_list.each do |step| execute_step(step, dry_run, log) if rebuild_set.include? step end end end
execute_step(step, dry_run, log)
click to toggle source
# File lib/dake/executor.rb, line 111 def execute_step(step, dry_run, log) prepare_step(step) protocol = step.option_dict['protocol'] protocol ||= 'shell' line, column = @analyzer.step_line_and_column step proto = DakeProtocol::ProtocolDict[protocol].new(step, @analyzer, @dake_db, dry_run) STDERR.puts ("[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] Running #{protocol} step(#{step.object_id}) defined in " + "#{step.src_file} at #{line}:#{column}").colorize(:green) STDERR.puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] step(#{step.object_id}) Script in #{proto.script_file}".colorize(:green) unless dry_run step.targets.each do |target| next if target.scheme.is_a? DakeScheme::Regex if target.scheme.is_a? DakeScheme::Tag STDERR.puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] step(#{step.object_id}) Producing ".colorize(:green) + "@#{target.scheme.path}".colorize(:light_blue) else STDERR.puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] step(#{step.object_id}) Producing ".colorize(:green) + "#{target.scheme.path}".colorize(:light_white) end end STDERR.puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] step(#{step.object_id}) STDOUT in #{proto.script_stdout}".colorize(:green) if log and not dry_run STDERR.puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] step(#{step.object_id}) STDERR in #{proto.script_stderr}".colorize(:green) if log and not dry_run if dry_run puts step.cmd_text else proto.execute_step(log) end STDERR.puts ("[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] Complete #{protocol} step(#{step.object_id}) defined in " + "#{step.src_file} at #{line}:#{column}").colorize(:green) end
prepare_command(step, context={})
click to toggle source
command preparation is intentionally deferred to execution phase to accelerate the analysis phase of big workflow file
# File lib/dake/executor.rb, line 170 def prepare_command(step, context={}) mixin_method = (step.option_dict['method'] ? step.option_dict['method'] : nil) method_mode = (step.option_dict['method_mode'] ? step.option_dict['method_mode'] : 'prepend') if mixin_method meth = @analyzer.method_dict[mixin_method] unless meth line, column = @analyzer.option_line_and_column(step.options, 'method') raise "Method `#{mixin_method}' used in #{step.src_file} at #{line}:#{column} is not defined." end @analyzer.analyze_option(meth) unless step.option_dict['protocol'] == meth.option_dict['protocol'] line, column = @analyzer.option_line_and_column(step.options, 'protocol') line, column = @analyzer.step_line_and_column(step) unless line meth_line, meth_column = meth.name.line_and_column raise "Method `#{mixin_method}' defined in #{meth.src_file} at #{meth_line}:#{meth_column} " + "uses protocol `#{meth.option_dict['protocol']}', which is incompatible with protocol " + "`#{step.option_dict['protocol']}' used in #{step.src_file} at #{line}:#{column}." end if method_mode == 'replace' return prepare_command(meth, step.context) else method_text = prepare_command(meth, step.context) end end cmd_text = '' cmd_text << method_text if mixin_method and method_mode == 'prepend' first_indent = step.commands[0].items[0].to_s unless step.commands.empty? step.commands.each do |command| indent = command.items[0] if not indent.to_s.start_with? first_indent line, column = indent.line_and_column raise "Incompatible indentation in #{step.src_file} at #{line}:#{column}." else indentation = indent.to_s[first_indent.length..-1] cmd_text << indentation + @analyzer.text_eval(command, step.src_file, step.context.merge(context), 1) + "\n" end end cmd_text << method_text if mixin_method and method_mode == 'append' if cmd_text == '' line, column = @analyzer.step_line_and_column step step_meth = step.is_a?(StepMethod) ? 'Method' : 'Step' raise "#{step_meth} defined in #{step.src_file} at #{line}:#{column} has no commands." end cmd_text end
prepare_step(step)
click to toggle source
# File lib/dake/executor.rb, line 144 def prepare_step(step) context = step.context.merge!({'OUTPUTN' => 0, 'OUTPUTS' => [], 'INPUTN' => 0, 'INPUTS' => []}) step.targets.reject { |f| [DakeScheme::Tag, DakeScheme::Regex].include? f.scheme.class }.each_with_index do |output, n| name = output.scheme.path context["OUTPUT"] = name if n == 0 context["OUTPUT#{n}"] = name context["OUTPUTS"] << name context["OUTPUTN"] += 1 end context['OUTPUTN'] = context['OUTPUTN'].to_s context['OUTPUTS'] = context['OUTPUTS'].join("\n") step.prerequisites.reject { |s| s.tag }.each_with_index do |input, n| name = input.scheme.path context["INPUT"] = name if n == 0 context["INPUT#{n}"] = name context["INPUTS"] << name context["INPUTN"] += 1 end context['INPUTN'] = context['INPUTN'].to_s context['INPUTS'] = context['INPUTS'].join("\n") @analyzer.analyze_option(step) step.cmd_text = prepare_command(step) end