class SProc::SProc
Execute a command in a subprocess, either synchronuously or asyncronously.
Attributes
Public Class Methods
return processes that are no longer running
# File lib/sproc/core.rb, line 222 def self.get_finished(running_proc) running_proc.select do |p| [ExecutionState::COMPLETED, ExecutionState::ABORTED, ExecutionState::FAILED_TO_START].include?(p.execution_state) end end
prepare to run a sub process @param type the ShellType
used to run the process within @param stdout_callback a callback that will receive all stdout output
from the process as it is running (default nil)
@param stderr_callback a callback that will receive all stderr output
from the process as it is running (default nil)
@param env a hash containing key/value pairs of strings that
set the environment variable 'key' to 'value'. If value is nil, that env variable is unset
example callback signature: def my_stdout_cb(line)
# File lib/sproc/core.rb, line 65 def initialize(type: ShellType::NONE, stdout_callback: nil, stderr_callback: nil, env: {}) @run_opts = { type: type, stdout_callback: stdout_callback, stderr_callback: stderr_callback } @runner = TaskRunner.new(@run_opts) @execution_thread = nil @env = env end
blocks until all processes in the given array are completed/aborted.
the implementation polls each process after each given poll interval (in ms)
@return true if all processes exited with status 0, false in all other cases
# File lib/sproc/core.rb, line 164 def self.wait_on_all(running_proc, polling_interval = 100, &block) until running_proc.empty? done = get_finished(running_proc) running_proc -= done next unless block_given? done.each(&block) if block_given? sleep polling_interval / 1000 end end
Wait for processes to complete and give a block an opportunity to start one or more new processes for each completed one. a given block will be handed each completed SubProcess. If the block returns one or more SubProcess objects, these will be waited upon as well.
@param running_proc an array of running proecesses to block on @param polling_interval how often (in ms) we check the run state of the
running processes (default every 100 ms)
@return all finished processes
Example usage:
# start 3 processes asyncronously nof_processes = 3 p_array = (1..nof_processes).collect do
SubProcess.new(SubProcess::NONE).exec_async('ping', ['127.0.0.1'])
end
# block until a process completes and then immediately start a new process # until we've started 10 in total p_total = SubProcess.wait_or_back_to_back(p_array) do |p|
# create new processes until we reach 10 unless nof_processes >= 10 np = SubProcess.new.exec_async('echo', "Process #{nof_processes}") nof_processes += 1 end np
end … here p_total will contain all 10 finished SubProcess objects
# File lib/sproc/core.rb, line 204 def self.wait_or_back_to_back(running_proc, polling_interval = 100) all_proc = running_proc.dup until running_proc.empty? done = get_finished(running_proc) running_proc -= done next unless block_given? done.each do |p| new_proc = Array(yield(p)).select { |r| r.is_a?(SProc) } running_proc += new_proc all_proc += new_proc end sleep polling_interval / 1000 end all_proc end
Public Instance Methods
Start the process non-blocking. Use one of the wait… methods to later block on the process. @return this SubProcess instance
# File lib/sproc/core.rb, line 94 def exec_async(cmd, *args, **opts) exec(false, @env, cmd, *args, **opts) end
Start the sub-process and block until it has completed.
@cmd the command to execute @args an array with all arguments to the cmd @opts a hash with options that influence the spawned process
the supported options are: chdir umask unsetenv_others See Process.spawn for definitions
@return this SubProcess instance
# File lib/sproc/core.rb, line 87 def exec_sync(cmd, *args, **opts) exec(true, @env, cmd, *args, **opts) end
Return the execution state of this SubProcess. Note that it is not identical with the life-cycle of the underlying ProcessStatus object
@return current ExecutionState
# File lib/sproc/core.rb, line 119 def execution_state return ExecutionState::NOT_STARTED if @execution_thread.nil? # Count this SubProcess as running as long as the thread # that executes it is alive (this includes book keeping # chores within this class as well) return ExecutionState::RUNNING if @execution_thread.alive? status = task_info[:process_status] # an execution thread that has run but not generated a task_info # means that we tried to start a process but failed return ExecutionState::FAILED_TO_START if status.nil? # a process can terminate for different reasons: # - its done # - an uncaught exception- # - an uncaught signal # this should take care of uncaught signals return ExecutionState::ABORTED if status.signaled? # If the process completed (either successfully or not) return ExecutionState::COMPLETED if status.exited? # We don't currently handle a process that has been stopped... raise NotImplementedError("Unhandled process 'stopped' status!") if status.stopped? # We should never come here raise RuntimeError("Unhandled process status: #{status.inspect}") end
check if this process has completed with exit code 0 (success) or not
# File lib/sproc/core.rb, line 100 def exit_zero? return false unless execution_state == ExecutionState::COMPLETED task_info[:process_status].exitstatus.zero? end
# File lib/sproc/core.rb, line 47 def logger self.class.logger end
@return the TaskInfo representing this SubProcess, nil if
process has not started
# File lib/sproc/core.rb, line 153 def task_info @runner.task_info end
Block caller until this subprocess has completed or aborted @return the TaskInfo struct of the completed process
# File lib/sproc/core.rb, line 108 def wait_on_completion return if @execution_thread.nil? @execution_thread.join task_info end
Private Instance Methods
a helper method that supports both synch/async execution depending on the supplied args
# File lib/sproc/core.rb, line 234 def exec(synch, env, cmd, *args, **opts) raise 'Subprocess already running!' unless @execution_thread.nil? || !@execution_thread.alive? # kick-off a fresh task runner and execution thread @runner = TaskRunner.new(@run_opts) @execution_thread = Thread.new do @runner.execute(env, cmd, *args, **opts) end @execution_thread.join if synch self end