module Jongleur::API

Here be methods to be accessed by the gem's client, i.e. the public API

Public Class Methods

add_task_graph(task_graph_hash) click to toggle source

Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix

@param [Hash<Symbol, Array>] task_graph_hash @raise [ArgumentError] if the task_matrix argument is not structured correctly @return [void]

# File lib/jongleur/api.rb, line 18
def self.add_task_graph(task_graph_hash)
  @@task_matrix = Array.new
  raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash)
  # this task_graph will raise the error below , { A: [:B],  B: :C,  C: []}
  task_graph_hash.values.each do |val|
    raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array)
  end
  # this task_graph will raise the error below , { A: [:B],  B: [:C, :D],  C: []}
  if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative?
    raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry'
  end
  @@task_graph = task_graph_hash
  @@task_matrix = Implementation.build_task_matrix(task_graph_hash)
end
failed_tasks(my_task_matrix) click to toggle source

Analyses the Task Matrix for all Tasks that failed to finish successfully

@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the failed Tasks

# File lib/jongleur/api.rb, line 84
def self.failed_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == false }
end
get_predecessor_pids(a_task) click to toggle source
# File lib/jongleur/api.rb, line 109
def self.get_predecessor_pids(a_task)
  pids = Array.new
  Implementation.get_predecessors(a_task).each do |task|
    pids << Implementation.get_process_id(task)
  end
  pids
end
hung_tasks(my_task_matrix) click to toggle source

Analyses the Task Matrix for all Tasks that started but failed to finish

@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the Tasks that started but failed to finish

# File lib/jongleur/api.rb, line 103
def self.hung_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.pid != StatusCodes::PROCESS_NOT_YET_RAN
  }
end
not_ran_tasks(my_task_matrix) click to toggle source

Analyses the Task Matrix for all Tasks that haven't been ran

@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the Tasks that haven't been ran

# File lib/jongleur/api.rb, line 92
def self.not_ran_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.exit_status == nil &&
    x.pid == StatusCodes::PROCESS_NOT_YET_RAN
  }
end
print_graph(dir="") click to toggle source

Prints the TaskGraph to a PDF file

@param [String] the directory name to print the file to @return [String] the PDF file name

run(&block) click to toggle source

The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format

@note This method launches processes without precedence constraints, traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished. @raise [RuntimeError] if there are no implementations for Tasks in the Task Graph @return [void]

# File lib/jongleur/api.rb, line 127
def self.run(&block)
  unless Implementation.valid_tasks?(task_graph.keys)
    raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes'
  end

  Implementation.process_message 'Starting workflow...'
  trap_quit_signals
  @finished_tasks_queue = []
  start_processes

  trap(:CHLD) do
    begin
      # with WNOHANG flag we make sure Process.wait is not blocking
      while (res = Process.wait2(-1, Process::WNOHANG))
        dead_pid, status = res
        finish_time = Time.now.to_f
        dead_task_name = ''
        Implementation.find_task_by(:pid, dead_pid) do |t|
          t.running = false
          t.exit_status = status.exitstatus
          t.success_status = status.success?
          t.finish_time = finish_time
          dead_task_name = t.name
          @finished_tasks_queue << { name: dead_task_name, done: status.success?, pid: dead_pid}
        end
        msg = "finished task: %s, process: %i, exit_status: %i, success: %s"
        Implementation.process_message msg % [dead_task_name,
                                              dead_pid,
                                              status.exitstatus,
                                              status.success?]


      end

      # it's possible for the last CHLD signal to arrive after our trap
      # handler has already called Process.wait twice and reaped the
      # available status. In such a case we must handle (and ignore)
      # the oncoming exception so we don't get a crash.
    rescue Errno::ECHILD
    end
  end #trap

  loop do
    # run task's descendants as soon as task appears on 'finished' queue
    while task = @finished_tasks_queue.pop
      if task[:done]
        Implementation.run_descendants(task[:name])
      else
        msg = "Task #{task[:name]} with process id #{task[:pid]} was not succesfully completed."
        Implementation.process_message(msg)
      end
    end

    # We exit once all the child processes and their descendants are accounted for
    if Implementation.running_tasks.empty?
      Implementation.process_message 'Workflow finished'
      file_name = File.expand_path("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp')
      File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) }
      hollerback_for(block) { |cb| cb.respond_with(:completed , task_matrix) } if block_given?
      exit 0
    end
    sleep 1
  end
end
start_processes() click to toggle source

Starts all tasks without dependencies as separate processes

@return [void]

# File lib/jongleur/api.rb, line 196
def self.start_processes
  Implementation.tasks_without_predecessors.each do |t|
    t.running = true
    Implementation.process_message "starting task #{t.name}"
    t.pid = fork do
      Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute
    end
  end
end
successful_tasks(my_task_matrix) click to toggle source

Analyses the Task Matrix for all Tasks that ran successfully

@param [Array<Jongleur::Task>] the task matrix to analyse @return [Array<Jongleur::Task>] the successful Tasks

# File lib/jongleur/api.rb, line 74
def self.successful_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == true &&
    x.exit_status == 0
  }
end
task_graph() click to toggle source

@!attribute task_graph

@return [Hash<Symbol, Array<Symbol>>] where the Hash key is the Task
    name and the value is an array of dependent Tasks
@example
  a_task_graph = {:A=>[:B, :C], :B=>[:D], :C=>[:D], :D=>[:E], :E=>[]}
# File lib/jongleur/api.rb, line 66
def self.task_graph
  @@task_graph ||= {}
end
task_matrix() click to toggle source

@!attribute task_matrix

@return [Array<Jongleur::Task>] a list of Tasks and their current state
@see Jongleur::Task
# File lib/jongleur/api.rb, line 57
def self.task_matrix
  @@task_matrix || []
end
trap_quit_signals() click to toggle source

Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes

@return [void]

# File lib/jongleur/api.rb, line 210
def self.trap_quit_signals
  %i[INT QUIT].each do |signal|
    Signal.trap(signal) do
      Implementation.process_message " #{signal} sent to master process!"
      Implementation.running_tasks.each do |t|
        Implementation.process_message "....killing #{t.pid}"
        Process.kill(:KILL, t.pid)
      end
    end
  end
end