class Async::Container::Group

Manages a group of running processes.

Attributes

running[R]

@attribute [Hash<IO, Fiber>] the running tasks, indexed by IO.

Public Class Methods

new() click to toggle source

Initialize an empty group.

# File lib/async/container/group.rb, line 33
def initialize
        @running = {}
        
        # This queue allows us to wait for processes to complete, without spawning new processes as a result.
        @queue = nil
end

Public Instance Methods

any?() click to toggle source

Whether the group contains any running processes. @returns [Boolean]

# File lib/async/container/group.rb, line 51
def any?
        @running.any?
end
empty?() click to toggle source

Whether the group is empty. @returns [Boolean]

# File lib/async/container/group.rb, line 57
def empty?
        @running.empty?
end
interrupt() click to toggle source

Interrupt all running processes. This resumes the controlling fiber with an instance of {Interrupt}.

# File lib/async/container/group.rb, line 80
def interrupt
        Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...")
        @running.each_value do |fiber|
                fiber.resume(Interrupt)
        end
end
running?() click to toggle source

Whether the group contains any running processes. @returns [Boolean]

# File lib/async/container/group.rb, line 45
def running?
        @running.any?
end
sleep(duration) click to toggle source

Sleep for at most the specified duration until some state change occurs.

# File lib/async/container/group.rb, line 62
def sleep(duration)
        self.resume
        self.suspend
        
        self.wait_for_children(duration)
end
stop(timeout = 1) click to toggle source

Stop all child processes using {#terminate}. @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first.

# File lib/async/container/group.rb, line 98
def stop(timeout = 1)
        # Use a default timeout if not specified:
        timeout = 1 if timeout == true
        
        if timeout
                start_time = Async::Clock.now
                
                self.interrupt
                
                while self.any?
                        duration = Async::Clock.now - start_time
                        remaining = timeout - duration
                        
                        if remaining >= 0
                                self.wait_for_children(duration)
                        else
                                self.wait_for_children(0)
                                break
                        end
                end
        end
        
        # Terminate all children:
        self.terminate
        
        # Wait for all children to exit:
        self.wait
end
terminate() click to toggle source

Terminate all running processes. This resumes the controlling fiber with an instance of {Terminate}.

# File lib/async/container/group.rb, line 89
def terminate
        Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...")
        @running.each_value do |fiber|
                fiber.resume(Terminate)
        end
end
wait() click to toggle source

Begin any outstanding queued processes and wait for them indefinitely.

# File lib/async/container/group.rb, line 70
def wait
        self.resume
        
        while self.running?
                self.wait_for_children
        end
end
wait_for(channel) { |message| ... } click to toggle source

Wait for a message in the specified {Channel}.

# File lib/async/container/group.rb, line 128
def wait_for(channel)
        io = channel.in
        
        @running[io] = Fiber.current
        
        while @running.key?(io)
                result = Fiber.yield
                
                if result == Interrupt
                        channel.interrupt!
                elsif result == Terminate
                        channel.terminate!
                elsif message = channel.receive
                        yield message
                else
                        return channel.wait
                end
        end
ensure
        @running.delete(io)
end

Protected Instance Methods

resume() click to toggle source
# File lib/async/container/group.rb, line 176
def resume
        if @queue
                queue = @queue
                @queue = nil
                
                queue.each(&:resume)
        end
end
suspend() click to toggle source
# File lib/async/container/group.rb, line 172
def suspend
        @queue ||= []
end
wait_for_children(duration = nil) click to toggle source
# File lib/async/container/group.rb, line 152
def wait_for_children(duration = nil)
        if !@running.empty?
                # Maybe consider using a proper event loop here:
                readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
                
                readable&.each do |io|
                        @running[io].resume
                end
        end
end
yield() click to toggle source
# File lib/async/container/group.rb, line 163
def yield
        if @queue
                fiber = Fiber.current
                
                @queue << fiber
                Fiber.yield
        end
end