class Async::Container::Group
Manages a group of running processes.
Attributes
@attribute [Hash<IO, Fiber>] the running tasks, indexed by IO.
Public Class Methods
Initialize an empty group.
# File lib/async/container/group.rb, line 33 def initialize @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end
Public Instance Methods
Whether the group contains any running processes. @returns [Boolean]
# File lib/async/container/group.rb, line 51 def any? @running.any? end
Whether the group is empty. @returns [Boolean]
# File lib/async/container/group.rb, line 57 def empty? @running.empty? end
Interrupt all running processes. This resumes the controlling fiber with an instance of {Interrupt}.
# File lib/async/container/group.rb, line 80 def interrupt Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Interrupt) end end
Whether the group contains any running processes. @returns [Boolean]
# File lib/async/container/group.rb, line 45 def running? @running.any? end
Sleep for at most the specified duration until some state change occurs.
# File lib/async/container/group.rb, line 62 def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end
Stop all child processes using {#terminate}. @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first.
# File lib/async/container/group.rb, line 98 def stop(timeout = 1) # Use a default timeout if not specified: timeout = 1 if timeout == true if timeout start_time = Async::Clock.now self.interrupt while self.any? duration = Async::Clock.now - start_time remaining = timeout - duration if remaining >= 0 self.wait_for_children(duration) else self.wait_for_children(0) break end end end # Terminate all children: self.terminate # Wait for all children to exit: self.wait end
Terminate
all running processes. This resumes the controlling fiber with an instance of {Terminate}.
# File lib/async/container/group.rb, line 89 def terminate Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Terminate) end end
Begin any outstanding queued processes and wait for them indefinitely.
# File lib/async/container/group.rb, line 70 def wait self.resume while self.running? self.wait_for_children end end
Wait for a message in the specified {Channel}.
# File lib/async/container/group.rb, line 128 def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif message = channel.receive yield message else return channel.wait end end ensure @running.delete(io) end
Protected Instance Methods
# File lib/async/container/group.rb, line 176 def resume if @queue queue = @queue @queue = nil queue.each(&:resume) end end
# File lib/async/container/group.rb, line 172 def suspend @queue ||= [] end
# File lib/async/container/group.rb, line 152 def wait_for_children(duration = nil) if !@running.empty? # Maybe consider using a proper event loop here: readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) readable&.each do |io| @running[io].resume end end end
# File lib/async/container/group.rb, line 163 def yield if @queue fiber = Fiber.current @queue << fiber Fiber.yield end end