class OctocatalogDiff::Util::Parallel

Public Class Methods

execute_task(task, logger) click to toggle source

Utility method! Not intended to be called from outside this class.


Process a single task. Called by run_tasks_parallel / run_tasks_serial. This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object itself, and not raise them. @param task [OctocatalogDiff::Util::Parallel::Task] Task object @param logger [Logger] Logger @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result

# File lib/octocatalog-diff/util/parallel.rb, line 194
def self.execute_task(task, logger)
  begin
    logger.debug("Begin #{task.description}")
    output = task.execute(logger)
    result = Result.new(output: output, status: true, args: task.args)
  rescue => exc
    logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
    # Immediately return without running the validation, since this already failed.
    return Result.new(exception: exc, status: false, args: task.args)
  end

  begin
    if task.validate(output, logger)
      logger.debug("Success #{task.description}")
    else
      # Preferably the validator method raised its own exception. However if it
      # simply returned false, raise our own exception here.
      raise "Failed #{task.description} validation (unspecified error)"
    end
  rescue => exc
    logger.warn("Failed #{task.description} validation: #{exc.class} #{exc.message}")
    result.status = false
    result.exception = exc
  end

  result
end
run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false) click to toggle source

Entry point for parallel processing. By default this will perform parallel processing, but it will also accept an option to do serial processing instead. @param task_array [Array<Parallel::Task>] Tasks to run @param logger [Logger] Optional logger object @param parallelized [Boolean] True for parallel processing, false for serial processing @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results @return [Array<Parallel::Result>] Parallel results (same order as tasks)

# File lib/octocatalog-diff/util/parallel.rb, line 72
def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false)
  # Create a throwaway logger object if one is not given
  logger ||= Logger.new(StringIO.new)

  # Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
  # return an empty array right away.
  raise ArgumentError, "run_tasks() argument must be array, not #{task_array.class}" unless task_array.is_a?(Array)
  return [] if task_array.empty?

  invalid_inputs = task_array.reject { |task| task.is_a?(OctocatalogDiff::Util::Parallel::Task) }
  if invalid_inputs.any?
    ele = invalid_inputs.first
    raise ArgumentError, "Element #{ele.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{ele.class}"
  end

  # Initialize the result array. For now all entries in the array indicate that the task was killed.
  # Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
  # the task was killed.
  result = task_array.map { |x| Result.new(exception: IncompleteTask.new('Killed'), args: x.args) }
  logger.debug "Initialized parallel task result array: size=#{result.size}"

  # Execute as per the requested method (serial or parallel) and handle results.
  exception = parallelized ? run_tasks_parallel(result, task_array, logger) : run_tasks_serial(result, task_array, logger)
  raise exception if exception && raise_exception
  result
end
run_tasks_parallel(result, task_array, logger) click to toggle source

Utility method! Not intended to be called from outside this class.


Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result) into a serialized data file. Once children are forked this method waits for their return, deserializing the output from each data file and updating the `result` array with actual results. @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform @param logger [Logger] Logger @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.

# File lib/octocatalog-diff/util/parallel.rb, line 109
def self.run_tasks_parallel(result, task_array, logger)
  pidmap = {}
  ipc_tempdir = OctocatalogDiff::Util::Util.temp_dir('ocd-ipc-')

  # Child process forking
  task_array.each_with_index do |task, index|
    # simplecov doesn't see this because it's forked
    # :nocov:
    this_pid = fork do
      ENV['OCTOCATALOG_DIFF_TEMPDIR'] ||= ipc_tempdir
      task_result = execute_task(task, logger)
      File.open(File.join(ipc_tempdir, "#{Process.pid}.dat"), 'w') { |f| f.write Marshal.dump(task_result) }
      Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
    end
    # :nocov:

    pidmap[this_pid] = { index: index, start_time: Time.now }
    logger.debug "Launched pid=#{this_pid} for index=#{index}"
    logger.reopen if logger.respond_to?(:reopen)
  end

  # Waiting for children and handling results
  while pidmap.any?
    pidmap.each do |pid|
      status = Process.waitpid2(pid[0], Process::WNOHANG)
      next if status.nil?
      this_pid, exit_obj = status
      next unless this_pid && pidmap.key?(this_pid)
      index = pidmap[this_pid][:index]
      exitstatus = exit_obj.exitstatus
      raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
      raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?

      input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
      result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
      time_delta = Time.now - pidmap[this_pid][:start_time]
      pidmap.delete(this_pid)

      logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"

      next if result[index].status
      return result[index].exception
    end
  end

  logger.debug 'All child processes completed with no exceptions raised'

# Cleanup: Kill any child processes that are still running, and clean the temporary directory
# where data files were stored.
ensure
  pidmap.each do |pid, _pid_data|
    begin
      Process.kill('TERM', pid)
    rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
      # If the process doesn't exist, that's fine.
    end
  end
end
run_tasks_serial(result, task_array, logger) click to toggle source

Utility method! Not intended to be called from outside this class.


Perform the tasks in serial. @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform @param logger [Logger] Logger

# File lib/octocatalog-diff/util/parallel.rb, line 174
def self.run_tasks_serial(result, task_array, logger)
  # Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
  # whereas a failed task will replace the current element with an exception, and all later tasks
  # will not be replaced (thereby being populated with the cancellation error).
  task_array.each_with_index do |ele, task_counter|
    result[task_counter] = execute_task(ele, logger)
    next if result[task_counter].status
    return result[task_counter].exception
  end
  nil
end