class Process::Group

A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.

Constants

VERSION

Attributes

limit[RW]

The maximum number of processes to run concurrently, or zero

running[R]

A table of currently running processes.

Public Class Methods

new(limit: nil, terminal: Terminal::Device.new?) click to toggle source

Create a new process group. Can specify `limit:` which limits the maximum number of concurrent processes.

# File lib/process/group.rb, line 106
def initialize(limit: nil, terminal: Terminal::Device.new?)
        raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0
        
        @pid = Process.pid
        
        @terminal = terminal
        
        @queue = []
        @limit = limit
        
        @running = {}
        @fiber = nil
        
        @pgid = nil
        
        # Whether we can actively schedule tasks or not:
        @waiting = false
end
wait(**options, &block) click to toggle source
# File lib/process/group.rb, line 27
def self.wait(**options, &block)
        group = Group.new(**options)
        
        group.wait(&block)
end

Public Instance Methods

async() { |self| ... } click to toggle source
# File lib/process/group.rb, line 156
def async
        Fiber.new do
                yield self
        end.resume
end
available?() click to toggle source

Whether or not spawn, fork or run can be scheduled immediately.

# File lib/process/group.rb, line 173
def available?
        if @limit
                @running.size < @limit
        else
                true
        end
end
blocking?() click to toggle source

Whether or not calling spawn, fork or run would block the caller fiber (i.e. call Fiber.yield).

# File lib/process/group.rb, line 182
def blocking?
        not available?
end
fork(**options, &block) click to toggle source

Fork a block as a child process.

# File lib/process/group.rb, line 168
def fork(**options, &block)
        append! Fork.new(block, **options)
end
id() click to toggle source

The id of the process group, only valid if processes are currently running.

# File lib/process/group.rb, line 132
def id
        raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0
        
        -@pgid
end
kill(signal = :INT) click to toggle source

Send a signal to all currently running processes. No-op unless running?

# File lib/process/group.rb, line 227
def kill(signal = :INT)
        if running?
                Process.kill(signal, id)
        end
end
queued?() click to toggle source
# File lib/process/group.rb, line 138
def queued?
        @queue.size > 0
end
run(*arguments, **options) { |exit_status| ... } click to toggle source

Run a process in a new fiber, arguments have same meaning as Process#spawn.

# File lib/process/group.rb, line 148
def run(*arguments, **options)
        Fiber.new do
                exit_status = self.spawn(*arguments, **options)
                
                yield exit_status if block_given?
        end.resume
end
running?() click to toggle source

Are there processes currently running?

# File lib/process/group.rb, line 143
def running?
        @running.size > 0
end
spawn(*arguments, **options) click to toggle source

Run a specific command as a child process.

# File lib/process/group.rb, line 163
def spawn(*arguments, **options)
        append! Spawn.new(arguments, **options)
end
to_s() click to toggle source
# File lib/process/group.rb, line 233
def to_s
        "#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>"
end
wait() { |self| ... } click to toggle source

Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.

# File lib/process/group.rb, line 187
def wait
        raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid
        
        waiting do
                yield(self) if block_given?
                
                while running?
                        process, status = wait_one
                        
                        schedule!
                        
                        process.resume(status)
                end
        end
        
        # No processes, process group is no longer valid:
        @pgid = nil
        
        return self
rescue Interrupt
        # If the user interrupts the wait, interrupt the process group and wait for them to finish:
        self.kill(:INT)
        
        # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below:
        wait_all
        
        raise
ensure
        # You'd only get here with running processes if some unexpected error was thrown in user code:
        begin
                self.kill(:TERM)
        rescue Errno::EPERM
                # Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition.
        end
        
        # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller:
        wait_all
end

Private Instance Methods

append!(process) click to toggle source

Append a process to the queue and schedule it for execution if possible.

# File lib/process/group.rb, line 256
def append!(process)
        @queue << process
        
        schedule! if waiting?
        
        Fiber.yield
end
schedule!() click to toggle source

Run any processes while space is available in the group.

# File lib/process/group.rb, line 265
def schedule!
        while available? and @queue.size > 0
                process = @queue.shift
                
                if @running.size == 0
                        pid = process.call(:pgroup => true)
                        
                        # The process group id is the pid of the first process:
                        @pgid = pid
                else
                        pid = process.call(:pgroup => @pgid)
                end
                
                if @terminal and process.foreground?
                        @terminal.foreground = pid
                end
                
                @running[pid] = process
        end
end
wait_all() click to toggle source

Wait for all children to exit but without resuming any controlling fibers.

# File lib/process/group.rb, line 287
def wait_all
        wait_one while running?
        
        # Clear any queued tasks:
        @queue.clear
end
wait_one(flags = 0) click to toggle source

Wait for one process, should only be called when a child process has finished, otherwise would block.

# File lib/process/group.rb, line 295
def wait_one(flags = 0)
        raise RuntimeError.new("Process group has no running children!") unless running?
        
        # Wait for processes in this group:
        pid, status = Process.wait2(-@pgid, flags)

        return if flags & Process::WNOHANG and pid == nil

        process = @running.delete(pid)

        # This should never happen unless something very odd has happened:
        raise RuntimeError.new("Process id=#{pid} is not part of group!") unless process
        
        return process, status
end
waiting() { || ... } click to toggle source

The waiting loop, schedule any outstanding tasks:

# File lib/process/group.rb, line 240
def waiting
        @waiting = true
        
        # Schedule any queued tasks:
        schedule!
        
        yield
ensure
        @waiting = false
end
waiting?() click to toggle source
# File lib/process/group.rb, line 251
def waiting?
        @waiting
end