class Process::Group
A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait
to ensure that all fibers eventually resume.
Constants
- VERSION
Attributes
The maximum number of processes to run concurrently, or zero
A table of currently running processes.
Public Class Methods
Create a new process group. Can specify `limit:` which limits the maximum number of concurrent processes.
# File lib/process/group.rb, line 106 def initialize(limit: nil, terminal: Terminal::Device.new?) raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0 @pid = Process.pid @terminal = terminal @queue = [] @limit = limit @running = {} @fiber = nil @pgid = nil # Whether we can actively schedule tasks or not: @waiting = false end
# File lib/process/group.rb, line 27 def self.wait(**options, &block) group = Group.new(**options) group.wait(&block) end
Public Instance Methods
# File lib/process/group.rb, line 156 def async Fiber.new do yield self end.resume end
Fork
a block as a child process.
# File lib/process/group.rb, line 168 def fork(**options, &block) append! Fork.new(block, **options) end
The id of the process group, only valid if processes are currently running.
# File lib/process/group.rb, line 132 def id raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0 -@pgid end
Send a signal to all currently running processes. No-op unless running?
# File lib/process/group.rb, line 227 def kill(signal = :INT) if running? Process.kill(signal, id) end end
# File lib/process/group.rb, line 138 def queued? @queue.size > 0 end
Run a process in a new fiber, arguments have same meaning as Process#spawn.
# File lib/process/group.rb, line 148 def run(*arguments, **options) Fiber.new do exit_status = self.spawn(*arguments, **options) yield exit_status if block_given? end.resume end
Are there processes currently running?
# File lib/process/group.rb, line 143 def running? @running.size > 0 end
Run a specific command as a child process.
# File lib/process/group.rb, line 163 def spawn(*arguments, **options) append! Spawn.new(arguments, **options) end
# File lib/process/group.rb, line 233 def to_s "#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>" end
Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.
# File lib/process/group.rb, line 187 def wait raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid waiting do yield(self) if block_given? while running? process, status = wait_one schedule! process.resume(status) end end # No processes, process group is no longer valid: @pgid = nil return self rescue Interrupt # If the user interrupts the wait, interrupt the process group and wait for them to finish: self.kill(:INT) # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below: wait_all raise ensure # You'd only get here with running processes if some unexpected error was thrown in user code: begin self.kill(:TERM) rescue Errno::EPERM # Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition. end # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller: wait_all end
Private Instance Methods
Append a process to the queue and schedule it for execution if possible.
# File lib/process/group.rb, line 256 def append!(process) @queue << process schedule! if waiting? Fiber.yield end
Run any processes while space is available in the group.
# File lib/process/group.rb, line 265 def schedule! while available? and @queue.size > 0 process = @queue.shift if @running.size == 0 pid = process.call(:pgroup => true) # The process group id is the pid of the first process: @pgid = pid else pid = process.call(:pgroup => @pgid) end if @terminal and process.foreground? @terminal.foreground = pid end @running[pid] = process end end
Wait for all children to exit but without resuming any controlling fibers.
# File lib/process/group.rb, line 287 def wait_all wait_one while running? # Clear any queued tasks: @queue.clear end
Wait for one process, should only be called when a child process has finished, otherwise would block.
# File lib/process/group.rb, line 295 def wait_one(flags = 0) raise RuntimeError.new("Process group has no running children!") unless running? # Wait for processes in this group: pid, status = Process.wait2(-@pgid, flags) return if flags & Process::WNOHANG and pid == nil process = @running.delete(pid) # This should never happen unless something very odd has happened: raise RuntimeError.new("Process id=#{pid} is not part of group!") unless process return process, status end
The waiting loop, schedule any outstanding tasks:
# File lib/process/group.rb, line 240 def waiting @waiting = true # Schedule any queued tasks: schedule! yield ensure @waiting = false end
# File lib/process/group.rb, line 251 def waiting? @waiting end