module Jongleur::API
Here be methods to be accessed by the gem's client, i.e. the public API
Public Class Methods
Accepts a task_graph
and does some initialisation, namely the assigning of class variables and creation of the inital task matrix
@param [Hash<Symbol, Array>] task_graph_hash @raise [ArgumentError] if the task_matrix
argument is not structured correctly @return [void]
# File lib/jongleur/api.rb, line 18 def self.add_task_graph(task_graph_hash) @@task_matrix = Array.new raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash) # this task_graph will raise the error below , { A: [:B], B: :C, C: []} task_graph_hash.values.each do |val| raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array) end # this task_graph will raise the error below , { A: [:B], B: [:C, :D], C: []} if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative? raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry' end @@task_graph = task_graph_hash @@task_matrix = Implementation.build_task_matrix(task_graph_hash) end
Analyses the Task Matrix for all Tasks that failed to finish successfully
@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the failed Tasks
# File lib/jongleur/api.rb, line 84 def self.failed_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == false } end
# File lib/jongleur/api.rb, line 109 def self.get_predecessor_pids(a_task) pids = Array.new Implementation.get_predecessors(a_task).each do |task| pids << Implementation.get_process_id(task) end pids end
Analyses the Task Matrix for all Tasks that started but failed to finish
@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the Tasks that started but failed to finish
# File lib/jongleur/api.rb, line 103 def self.hung_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.pid != StatusCodes::PROCESS_NOT_YET_RAN } end
Analyses the Task Matrix for all Tasks that haven't been ran
@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the Tasks that haven't been ran
# File lib/jongleur/api.rb, line 92 def self.not_ran_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.exit_status == nil && x.pid == StatusCodes::PROCESS_NOT_YET_RAN } end
Prints the TaskGraph to a PDF file
@param [String] the directory name to print the file to @return [String] the PDF file name
# File lib/jongleur/api.rb, line 37 def self.print_graph(dir="") graph = Graphviz::Graph.new dir = Dir.pwd if (!dir || dir.empty?) file_name = File.expand_path("jongleur_graph_#{Time.now.strftime('%m%d%Y_%H%M%S')}.pdf", dir) task_graph.each do |parent_node, child_nodes| new_node = unless graph.node_exists?(parent_node) graph.add_node( parent_node ) else graph.get_node( parent_node ).first end child_nodes.each { |child_node| new_node.add_node(child_node) } end Graphviz::output(graph, path: file_name) file_name end
The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format
@note This method launches processes without precedence constraints, traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished. @raise [RuntimeError] if there are no implementations for Tasks in the Task Graph @return [void]
# File lib/jongleur/api.rb, line 127 def self.run(&block) unless Implementation.valid_tasks?(task_graph.keys) raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes' end Implementation.process_message 'Starting workflow...' trap_quit_signals @finished_tasks_queue = [] start_processes trap(:CHLD) do begin # with WNOHANG flag we make sure Process.wait is not blocking while (res = Process.wait2(-1, Process::WNOHANG)) dead_pid, status = res finish_time = Time.now.to_f dead_task_name = '' Implementation.find_task_by(:pid, dead_pid) do |t| t.running = false t.exit_status = status.exitstatus t.success_status = status.success? t.finish_time = finish_time dead_task_name = t.name @finished_tasks_queue << { name: dead_task_name, done: status.success?, pid: dead_pid} end msg = "finished task: %s, process: %i, exit_status: %i, success: %s" Implementation.process_message msg % [dead_task_name, dead_pid, status.exitstatus, status.success?] end # it's possible for the last CHLD signal to arrive after our trap # handler has already called Process.wait twice and reaped the # available status. In such a case we must handle (and ignore) # the oncoming exception so we don't get a crash. rescue Errno::ECHILD end end #trap loop do # run task's descendants as soon as task appears on 'finished' queue while task = @finished_tasks_queue.pop if task[:done] Implementation.run_descendants(task[:name]) else msg = "Task #{task[:name]} with process id #{task[:pid]} was not succesfully completed." Implementation.process_message(msg) end end # We exit once all the child processes and their descendants are accounted for if Implementation.running_tasks.empty? Implementation.process_message 'Workflow finished' file_name = File.expand_path("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp') File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) } hollerback_for(block) { |cb| cb.respond_with(:completed , task_matrix) } if block_given? exit 0 end sleep 1 end end
Starts all tasks without dependencies as separate processes
@return [void]
# File lib/jongleur/api.rb, line 196 def self.start_processes Implementation.tasks_without_predecessors.each do |t| t.running = true Implementation.process_message "starting task #{t.name}" t.pid = fork do Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute end end end
Analyses the Task Matrix for all Tasks that ran successfully
@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the successful Tasks
# File lib/jongleur/api.rb, line 74 def self.successful_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == true && x.exit_status == 0 } end
@!attribute task_graph
@return [Hash<Symbol, Array<Symbol>>] where the Hash key is the Task name and the value is an array of dependent Tasks @example a_task_graph = {:A=>[:B, :C], :B=>[:D], :C=>[:D], :D=>[:E], :E=>[]}
# File lib/jongleur/api.rb, line 66 def self.task_graph @@task_graph ||= {} end
@!attribute task_matrix
@return [Array<Jongleur::Task>] a list of Tasks and their current state @see Jongleur::Task
# File lib/jongleur/api.rb, line 57 def self.task_matrix @@task_matrix || [] end
Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes
@return [void]
# File lib/jongleur/api.rb, line 210 def self.trap_quit_signals %i[INT QUIT].each do |signal| Signal.trap(signal) do Implementation.process_message " #{signal} sent to master process!" Implementation.running_tasks.each do |t| Implementation.process_message "....killing #{t.pid}" Process.kill(:KILL, t.pid) end end end end