class Async::Container::Generic

A base class for implementing containers.

Constants

UNNAMED

Attributes

state[R]
statistics[R]

Statistics relating to the behavior of children instances. @attribute [Statistics]

Public Class Methods

new(**options) click to toggle source
# File lib/async/container/generic.rb, line 59
def initialize(**options)
        @group = Group.new
        @running = true
        
        @state = {}
        
        @statistics = Statistics.new
        @keyed = {}
end
run(*arguments, **options, &block) click to toggle source
# File lib/async/container/generic.rb, line 53
def self.run(*arguments, **options, &block)
        self.new.run(*arguments, **options, &block)
end

Public Instance Methods

[](key) click to toggle source

Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.

# File lib/async/container/generic.rb, line 79
def [] key
        @keyed[key]&.value
end
async(**options, &block) click to toggle source

@deprecated Please use {spawn} or {run} instead.

# File lib/async/container/generic.rb, line 209
def async(**options, &block)
        spawn(**options) do |instance|
                Async::Reactor.run(instance, &block)
        end
end
failed?() click to toggle source

Whether any failures have occurred within the container. @returns [Boolean]

# File lib/async/container/generic.rb, line 89
def failed?
        @statistics.failed?
end
key?(key) click to toggle source

Whether a child instance exists for the given key.

# File lib/async/container/generic.rb, line 244
def key?(key)
        if key
                @keyed.key?(key)
        end
end
mark?(key) click to toggle source

Mark the container's keyed instance which ensures that it won't be discarded.

# File lib/async/container/generic.rb, line 231
def mark?(key)
        if key
                if value = @keyed[key]
                        value.mark!
                        
                        return true
                end
        end
        
        return false
end
reload() { || ... } click to toggle source

Reload the container's keyed instances.

# File lib/async/container/generic.rb, line 216
def reload
        @keyed.each_value(&:clear!)
        
        yield
        
        dirty = false
        
        @keyed.delete_if do |key, value|
                value.stop? && (dirty = true)
        end
        
        return dirty
end
run(count: Container.processor_count, **options, &block) click to toggle source

Run multiple instances of the same block in the container. @parameter count [Integer] The number of instances to start.

# File lib/async/container/generic.rb, line 200
def run(count: Container.processor_count, **options, &block)
        count.times do
                spawn(**options, &block)
        end
        
        return self
end
running?() click to toggle source

Whether the container has running children instances.

# File lib/async/container/generic.rb, line 94
def running?
        @group.running?
end
sleep(duration = nil) click to toggle source

Sleep until some state change occurs. @parameter duration [Numeric] the maximum amount of time to sleep for.

# File lib/async/container/generic.rb, line 100
def sleep(duration = nil)
        @group.sleep(duration)
end
spawn(name: nil, restart: false, key: nil, &block) click to toggle source

Spawn a child instance into the container. @parameter name [String] The name of the child instance. @parameter restart [Boolean] Whether to restart the child instance if it fails. @parameter key [Symbol] A key used for reloading child instances.

# File lib/async/container/generic.rb, line 154
def spawn(name: nil, restart: false, key: nil, &block)
        name ||= UNNAMED
        
        if mark?(key)
                Console.logger.debug(self) {"Reusing existing child for #{key}: #{name}"}
                return false
        end
        
        @statistics.spawn!
        
        fiber do
                while @running
                        child = self.start(name, &block)
                        
                        state = insert(key, child)
                        
                        begin
                                status = @group.wait_for(child) do |message|
                                        state.update(message)
                                end
                        ensure
                                delete(key, child)
                        end
                        
                        if status.success?
                                Console.logger.info(self) {"#{child} exited with #{status}"}
                        else
                                @statistics.failure!
                                Console.logger.error(self) {status}
                        end
                        
                        if restart
                                @statistics.restart!
                        else
                                break
                        end
                end
        # ensure
        #   Console.logger.error(self) {$!} if $!
        end.resume
        
        return true
end
status?(flag) click to toggle source

Returns true if all children instances have the specified status flag set. e.g. `:ready`. This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details. @returns [Boolean]

# File lib/async/container/generic.rb, line 113
def status?(flag)
        # This also returns true if all processes have exited/failed:
        @state.all?{|_, state| state[flag]}
end
stop(timeout = true) click to toggle source

Stop the children instances. @parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout.

# File lib/async/container/generic.rb, line 139
def stop(timeout = true)
        @running = false
        @group.stop(timeout)
        
        if @group.running?
                Console.logger.warn(self) {"Group is still running after stopping it!"}
        end
ensure
        @running = true
end
to_s() click to toggle source

A human readable representation of the container. @returns [String]

# File lib/async/container/generic.rb, line 73
def to_s
        "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."
end
wait() click to toggle source

Wait until all spawned tasks are completed.

# File lib/async/container/generic.rb, line 105
def wait
        @group.wait
end
wait_until_ready() click to toggle source

Wait until all the children instances have indicated that they are ready. @returns [Boolean] The children all became ready.

# File lib/async/container/generic.rb, line 120
def wait_until_ready
        while true
                Console.logger.debug(self) do |buffer|
                        buffer.puts "Waiting for ready:"
                        @state.each do |child, state|
                                buffer.puts "\t#{child.class}: #{state.inspect}"
                        end
                end
                
                self.sleep
                
                if self.status?(:ready)
                        return true
                end
        end
end

Protected Instance Methods

delete(key, child) click to toggle source

Clear the child (value) as running.

# File lib/async/container/generic.rb, line 266
def delete(key, child)
        if key
                @keyed.delete(key)
        end
        
        @state.delete(child)
end
insert(key, child) click to toggle source

Register the child (value) as running.

# File lib/async/container/generic.rb, line 253
def insert(key, child)
        if key
                @keyed[key] = Keyed.new(key, child)
        end
        
        state = {}
        
        @state[child] = state
        
        return state
end

Private Instance Methods

fiber(&block) click to toggle source
# File lib/async/container/generic.rb, line 277
def fiber(&block)
        Fiber.new(blocking: true, &block)
end