class MultiProcess::Group

Store and run a group of processes.

Attributes

partition[R]

Partition size.

processes[R]

Return list of processes.

receiver[RW]

Receiver all processes in group should use.

If changed only affect new added processes.

Public Class Methods

new(receiver: nil, partition: nil) click to toggle source

Create new process group.

@param opts [ Hash ] Options @option otps [ Receiver ] :receiver Receiver to use for new added

processes. Defaults to `MultiProcess::Logger.global`.
# File lib/multi_process/group.rb, line 25
def initialize(receiver: nil, partition: nil)
  @processes = []
  @receiver  = receiver ? receiver : MultiProcess::Logger.global
  @partition = partition ? partition.to_i : 0
  @mutex     = Mutex.new
end

Public Instance Methods

<<(procs) click to toggle source

Add new process or list of processes.

If group was already started added processes will also be started.

@param process [Process, Array<Process>] New process or processes.

# File lib/multi_process/group.rb, line 38
def <<(procs)
  Array(procs).flatten.each do |process|
    processes << process
    process.receiver = receiver

    start process if started?
  end
end
alive?() click to toggle source

Check if group is alive e.g. if at least on process is alive.

@return [ Boolean ] True if group is alive.

# File lib/multi_process/group.rb, line 119
def alive?
  processes.any?(&:alive?)
end
available!(timeout: MultiProcess::DEFAULT_TIMEOUT) click to toggle source

Wait until group is available. This implies waiting until all processes in group are available.

Processes will not be stopped if timeout occurs.

@param opts [ Hash ] Options. @option opts [ Integer ] :timeout Timeout in seconds to wait for processes

to become available. Defaults to {MultiProcess::DEFAULT_TIMEOUT}.
# File lib/multi_process/group.rb, line 139
def available!(timeout: MultiProcess::DEFAULT_TIMEOUT)
  Timeout.timeout timeout do
    processes.each(&:available!)
  end
end
available?() click to toggle source

Check if group is available. The group is available if all processes are available.

# File lib/multi_process/group.rb, line 126
def available?
  processes.all?(:available?)
end
run(delay: nil, timeout: nil) click to toggle source

Start all process and wait for them to terminate.

Given options will be passed to {#start} and {#wait}. {#start} will only be called if partition is zero.

If timeout is given process will be terminated using {#stop} when timeout error is raised.

# File lib/multi_process/group.rb, line 98
def run(delay: nil, timeout: nil)
  if partition > 0
    partition.times.map do
      Thread.new do
        while (process = next_process)
          process.run
        end
      end
    end.each(&:join)
  else
    start delay: delay
    wait timeout: timeout
  end
ensure
  stop
end
start(delay: nil) click to toggle source

Start all process in group.

Call blocks until all processes are started.

@option delay [Integer] Delay in seconds between starting processes.

# File lib/multi_process/group.rb, line 53
def start(delay: nil)
  processes.each do |process|
    next if process.started?

    process.start
    sleep delay if delay
  end
end
started?() click to toggle source

Check if group was already started.

@return [ Boolean ] True if group was already started.

# File lib/multi_process/group.rb, line 66
def started?
  processes.any?(&:started?)
end
stop() click to toggle source

Stop all processes.

# File lib/multi_process/group.rb, line 72
def stop
  processes.each(&:stop)
end
wait(timeout: nil) click to toggle source

Wait until all process terminated.

@param opts [ Hash ] Options. @option opts [ Integer ] :timeout Timeout in seconds to wait before

raising {Timeout::Error}.
# File lib/multi_process/group.rb, line 82
def wait(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait }
  else
    processes.each(&:wait)
  end
end

Private Instance Methods

next_process() click to toggle source
# File lib/multi_process/group.rb, line 147
def next_process
  @mutex.synchronize do
    @index ||= 0
    @index += 1
    processes[@index - 1]
  end
end