class Chantier::ProcessPool

Allows you to spin off a pool of subprocesses that is not larger than X, and maintains a pool of those proceses. You can then enqueue tasks to be executed within that pool. When all slots are full the caller will be blocked until a slot becomes available.

manager = ProcessPool.new(slots = 4)
jobs_hose.each_job do | job |
  # this call will block until a slot becomes available
  manager.fork_task do # this block runs in a subprocess
    Churner.new(job).churn
  end
  manager.still_running? # => most likely "true"
end

manager.block_until_complete! #=> Will block until all the subprocesses have terminated

If you have a finite Enumerable at hand you can also launch it into the ProcessPool, like so:

manager = ProcessPool.new(slots = 4)

manager.map_fork(job_tickets) do | job_ticket |
  # this block will run in a forked subprocess
  Churner.new(job).churn
  ...
end

Can be rewritten using Threads if operation on JVM/Rubinius will be feasible.

Constants

SCHEDULER_SLEEP_SECONDS

The manager uses loops in a few places. By doing a little sleep() in those loops we can yield process control back to the OS which brings the CPU usage of the managing process to small numbers. If you just do a loop {} MRI will saturate a whole core and not let go off of it until the loop returns.

Public Class Methods

new(num_procs, failure_policy: Chantier::FailurePolicies::None.new) click to toggle source

Initializes a new ProcessPool with the given number of workers. If max_failures is given the fork_task method will raise an exception if more than N processes spawned have been terminated with a non-0 exit status.

# File lib/process_pool.rb, line 39
def initialize(num_procs, failure_policy: Chantier::FailurePolicies::None.new)
  raise "Need at least 1 slot, given #{num_procs.to_i}" unless num_procs.to_i > 0
  @pids = [nil] * num_procs.to_i
  @semaphore = Mutex.new
  
  @failure_policy = Chantier::FailurePolicies::MutexWrapper.new(failure_policy)
  @failure_policy.arm!
end

Public Instance Methods

block_until_complete!() click to toggle source

Analogous to Process.wait or wait_all - will block until all of the process slots have been freed.

# File lib/process_pool.rb, line 123
def block_until_complete!
  loop do
    return unless still_running?
    sleep SCHEDULER_SLEEP_SECONDS # Breathing room
  end
end
fork_task(&blk) click to toggle source

Run the given block in a forked subprocess. This method will block the thread it is called from until a slot in the process table becomes free. Once that happens, the given block will be forked off and the method will return.

# File lib/process_pool.rb, line 74
def fork_task(&blk)
  if @failure_policy.limit_reached?
    raise "Reached error limit of processes quitting with non-0 status"
  end
  
  destination_slot_idx = nil
  
  # Try to find a slot in the process table where this job can go
  catch :_found do
    loop do
      @semaphore.synchronize do
        if destination_slot_idx = @pids.index(nil)
          @pids[destination_slot_idx] = true # occupy it
          throw :_found
        end
      end
      sleep SCHEDULER_SLEEP_SECONDS # Breathing room
    end
  end
  
  task_pid = fork(&blk)
  
  # No need to lock this because we already reserved that slot
  @pids[destination_slot_idx] = task_pid
  
  puts("Spun off a task process #{task_pid} into slot #{destination_slot_idx}") if $VERBOSE
  
  # Dispatch the watcher thread that will record that the process has quit into the
  # process table
  Thread.new do
    Process.wait(task_pid) # This call will block until that process quites
    terminated_normally = $?.exited? && $?.exitstatus.zero?
    @semaphore.synchronize do
      # Now we can remove that process from the process table
      @pids[destination_slot_idx] = nil
    end
    terminated_normally ? @failure_policy.success! : @failure_policy.failure!
  end
  
  # Make sure to return the PID afterwards
  task_pid
end
fork_task_in_all_slots(&blk) click to toggle source

Launch copies of the given task in all available slots for this Pool.

# File lib/process_pool.rb, line 66
def fork_task_in_all_slots(&blk)
  @pids.length.times { fork_task(&blk) }
end
map_fork(arguments_per_job) { |single_block_argument| ... } click to toggle source

Distributes the elements in the given Enumerable to parallel workers, N workers at a time. The method will return once all the workers for all the elements of the Enumerable have terminated.

pool = ProcessPool.new(5)
pool.map_fork(array_of_urls) do | single_url |
  Faraday.get(single_url).response ...
  ...
  ...
end
# File lib/process_pool.rb, line 58
def map_fork(arguments_per_job, &blk)
  arguments_per_job.each do | single_block_argument |
    fork_task { yield(single_block_argument) }
  end
  block_until_complete!
end
still_running?() click to toggle source

Tells whether some processes are still churning

# File lib/process_pool.rb, line 118
def still_running?
  @pids.any?{|e| e }
end