class MultiProcess::Group
Store and run a group of processes.
Attributes
Partition size.
Return list of processes.
Receiver
all processes in group should use.
If changed only affect new added processes.
Public Class Methods
Create new process group.
@param opts [ Hash ] Options @option otps [ Receiver
] :receiver Receiver
to use for new added
processes. Defaults to `MultiProcess::Logger.global`.
# File lib/multi_process/group.rb, line 25 def initialize(receiver: nil, partition: nil) @processes = [] @receiver = receiver ? receiver : MultiProcess::Logger.global @partition = partition ? partition.to_i : 0 @mutex = Mutex.new end
Public Instance Methods
Add new process or list of processes.
If group was already started added processes will also be started.
@param process [Process, Array<Process>] New process or processes.
# File lib/multi_process/group.rb, line 38 def <<(procs) Array(procs).flatten.each do |process| processes << process process.receiver = receiver start process if started? end end
Check if group is alive e.g. if at least on process is alive.
@return [ Boolean ] True if group is alive.
# File lib/multi_process/group.rb, line 119 def alive? processes.any?(&:alive?) end
Wait until group is available. This implies waiting until all processes in group are available.
Processes will not be stopped if timeout occurs.
@param opts [ Hash ] Options. @option opts [ Integer ] :timeout Timeout in seconds to wait for processes
to become available. Defaults to {MultiProcess::DEFAULT_TIMEOUT}.
# File lib/multi_process/group.rb, line 139 def available!(timeout: MultiProcess::DEFAULT_TIMEOUT) Timeout.timeout timeout do processes.each(&:available!) end end
Check if group is available. The group is available if all processes are available.
# File lib/multi_process/group.rb, line 126 def available? processes.all?(:available?) end
Start all process and wait for them to terminate.
Given options will be passed to {#start} and {#wait}. {#start} will only be called if partition is zero.
If timeout is given process will be terminated using {#stop} when timeout error is raised.
# File lib/multi_process/group.rb, line 98 def run(delay: nil, timeout: nil) if partition > 0 partition.times.map do Thread.new do while (process = next_process) process.run end end end.each(&:join) else start delay: delay wait timeout: timeout end ensure stop end
Start all process in group.
Call blocks until all processes are started.
@option delay [Integer] Delay in seconds between starting processes.
# File lib/multi_process/group.rb, line 53 def start(delay: nil) processes.each do |process| next if process.started? process.start sleep delay if delay end end
Check if group was already started.
@return [ Boolean ] True if group was already started.
# File lib/multi_process/group.rb, line 66 def started? processes.any?(&:started?) end
Stop all processes.
# File lib/multi_process/group.rb, line 72 def stop processes.each(&:stop) end
Wait until all process terminated.
@param opts [ Hash ] Options. @option opts [ Integer ] :timeout Timeout in seconds to wait before
raising {Timeout::Error}.
# File lib/multi_process/group.rb, line 82 def wait(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait } else processes.each(&:wait) end end
Private Instance Methods
# File lib/multi_process/group.rb, line 147 def next_process @mutex.synchronize do @index ||= 0 @index += 1 processes[@index - 1] end end