class Async::Container::Generic
A base class for implementing containers.
Constants
- UNNAMED
Attributes
Statistics
relating to the behavior of children instances. @attribute [Statistics]
Public Class Methods
# File lib/async/container/generic.rb, line 59 def initialize(**options) @group = Group.new @running = true @state = {} @statistics = Statistics.new @keyed = {} end
# File lib/async/container/generic.rb, line 53 def self.run(*arguments, **options, &block) self.new.run(*arguments, **options, &block) end
Public Instance Methods
Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.
# File lib/async/container/generic.rb, line 79 def [] key @keyed[key]&.value end
@deprecated Please use {spawn} or {run} instead.
# File lib/async/container/generic.rb, line 209 def async(**options, &block) spawn(**options) do |instance| Async::Reactor.run(instance, &block) end end
Whether any failures have occurred within the container. @returns [Boolean]
# File lib/async/container/generic.rb, line 89 def failed? @statistics.failed? end
Whether a child instance exists for the given key.
# File lib/async/container/generic.rb, line 244 def key?(key) if key @keyed.key?(key) end end
Mark the container's keyed instance which ensures that it won't be discarded.
# File lib/async/container/generic.rb, line 231 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end
Reload the container's keyed instances.
# File lib/async/container/generic.rb, line 216 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end
Run multiple instances of the same block in the container. @parameter count [Integer] The number of instances to start.
# File lib/async/container/generic.rb, line 200 def run(count: Container.processor_count, **options, &block) count.times do spawn(**options, &block) end return self end
Whether the container has running children instances.
# File lib/async/container/generic.rb, line 94 def running? @group.running? end
Sleep until some state change occurs. @parameter duration [Numeric] the maximum amount of time to sleep for.
# File lib/async/container/generic.rb, line 100 def sleep(duration = nil) @group.sleep(duration) end
Spawn a child instance into the container. @parameter name [String] The name of the child instance. @parameter restart [Boolean] Whether to restart the child instance if it fails. @parameter key [Symbol] A key used for reloading child instances.
# File lib/async/container/generic.rb, line 154 def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Console.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} return false end @statistics.spawn! fiber do while @running child = self.start(name, &block) state = insert(key, child) begin status = @group.wait_for(child) do |message| state.update(message) end ensure delete(key, child) end if status.success? Console.logger.info(self) {"#{child} exited with #{status}"} else @statistics.failure! Console.logger.error(self) {status} end if restart @statistics.restart! else break end end # ensure # Console.logger.error(self) {$!} if $! end.resume return true end
Returns true if all children instances have the specified status flag set. e.g. `:ready`. This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details. @returns [Boolean]
# File lib/async/container/generic.rb, line 113 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end
Stop the children instances. @parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout.
# File lib/async/container/generic.rb, line 139 def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? Console.logger.warn(self) {"Group is still running after stopping it!"} end ensure @running = true end
A human readable representation of the container. @returns [String]
# File lib/async/container/generic.rb, line 73 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end
Wait until all spawned tasks are completed.
# File lib/async/container/generic.rb, line 105 def wait @group.wait end
Wait until all the children instances have indicated that they are ready. @returns [Boolean] The children all became ready.
# File lib/async/container/generic.rb, line 120 def wait_until_ready while true Console.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.class}: #{state.inspect}" end end self.sleep if self.status?(:ready) return true end end end
Protected Instance Methods
Clear the child (value) as running.
# File lib/async/container/generic.rb, line 266 def delete(key, child) if key @keyed.delete(key) end @state.delete(child) end
Register the child (value) as running.
# File lib/async/container/generic.rb, line 253 def insert(key, child) if key @keyed[key] = Keyed.new(key, child) end state = {} @state[child] = state return state end
Private Instance Methods
# File lib/async/container/generic.rb, line 277 def fiber(&block) Fiber.new(blocking: true, &block) end