class MultiForkr

Attributes

child_defs[R]

Child Process Definitions @return Hash<Object, Integer>

child_sets[R]

Child process pids. @return Hash<Object, [Pids]>

inbound[R]
master_pid[R]

The PID of the Forkr master @return [Fixnum]

outbound[R]

Public Class Methods

new(forklets) click to toggle source

@param forklets Hash<Object, Integer> the worker objects, with counts

# File lib/multi_forkr.rb, line 16
def initialize(forklets)
  @child_defs = forklets
  @master_pid = $$
  @child_sets = Hash.new { |h, k| h[k] = Array.new }
  @in_shutdown = false
end

Public Instance Methods

run() click to toggle source

Start the master, and spawn workers @return [nil]

# File lib/multi_forkr.rb, line 25
def run
  @inbound, @outbound = IO.pipe
  Signal.trap('CHLD') { dead_child }
  Signal.trap('INT') { interrupt }
  Signal.trap('TERM') { shutdown }
  Signal.trap('QUIT') { core_dump_quit }
  master_loop
end

Protected Instance Methods

child_dead?(pid) click to toggle source
# File lib/multi_forkr.rb, line 156
def child_dead?(pid)
  status = Process.waitpid(pid, Process::WNOHANG)
  unless status.nil?
    $stderr.puts "Process #{pid} dead: #{status}"
  end
  !status.nil?
end
children() click to toggle source
# File lib/multi_forkr.rb, line 123
def children
  @child_sets.values.flat_map { |v| v }
end
core_dump_quit() click to toggle source
# File lib/multi_forkr.rb, line 44
def core_dump_quit
  send_wake_notice("Q")
end
dead_child() click to toggle source
# File lib/multi_forkr.rb, line 56
def dead_child
  send_wake_notice("D")
end
ensure_right_worker_count() click to toggle source
# File lib/multi_forkr.rb, line 107
def ensure_right_worker_count
  @child_defs.each_pair do |k, v|
    existing_workers = @child_sets[k]
    off_by = v - existing_workers.length
    if off_by > 0
      off_by.times do
        spawn_worker(k)
      end
    elsif off_by < 0
      existing_workers.take(off_by.abs).each do |kid|
        signal_worker(kid, :TERM)
      end
    end
  end
end
interrupt() click to toggle source
# File lib/multi_forkr.rb, line 48
def interrupt
  send_wake_notice("I")
end
master_loop() click to toggle source
# File lib/multi_forkr.rb, line 75
def master_loop
  catch(:bail_because_im_a_worker) do
    ensure_right_worker_count
    loop do
      fds = IO.select([@inbound],nil,nil,2)
      unless fds.nil?
        data_read = fds.first.first.read(1)
        if data_read == "I"
          shutdown_using(:INT)
        elsif data_read == "T"
          shutdown_using(:TERM)
        elsif data_read == "Q"
          shutdown_using(:QUIT)
        end
      end
      prune_workers
      ensure_right_worker_count
    end
    reap_all_workers
    @outbound.close
    @inbound.close
  end
end
prune_workers() click to toggle source
# File lib/multi_forkr.rb, line 138
def prune_workers
  new_sets = {}
  @child_sets.each_pair do |k, v|
    living_children = v.reject { |pid| child_dead?(pid) }
    new_sets[k] = living_children
  end
  @child_sets = new_sets
end
reap_all_workers() click to toggle source
# File lib/multi_forkr.rb, line 99
def reap_all_workers
  begin
    wpid, status = Process.waitpid2(-1, Process::WNOHANG)
  rescue Errno::ECHILD
    break
  end while true
end
send_wake_notice(notice) click to toggle source
# File lib/multi_forkr.rb, line 38
def send_wake_notice(notice)
  return(nil) if $$ != master_pid
  return(nil) if @in_shutdown
  @outbound.write(notice)
end
shutdown() click to toggle source
# File lib/multi_forkr.rb, line 52
def shutdown
  send_wake_notice("T")
end
shutdown_using(sig) click to toggle source
# File lib/multi_forkr.rb, line 69
def shutdown_using(sig)
  @in_shutdown = true
  signal_all_workers(sig)
  raise StopIteration.new
end
signal_all_workers(sig) click to toggle source
# File lib/multi_forkr.rb, line 127
def signal_all_workers(sig)
  children.each { |c| signal_worker(c, sig) }
end
signal_worker(wpid, signal) click to toggle source
# File lib/multi_forkr.rb, line 131
def signal_worker(wpid, signal)
  begin
    Process.kill(signal, wpid)
  rescue Errno::ESRCH
  end
end
spawn_worker(forklet) click to toggle source
# File lib/multi_forkr.rb, line 60
def spawn_worker(forklet)
  if new_pid = fork
    existing_worker_pids = @child_sets[forklet]
    @child_sets[forklet] = existing_worker_pids + [new_pid]
  else
    worker_loop(forklet)
  end
end
worker_loop(forklet) click to toggle source
# File lib/multi_forkr.rb, line 147
def worker_loop(forklet)
  forklet.after_fork if forklet.respond_to?(:after_fork)
  @inbound.close
  @outbound.close
  $stderr.puts "Worker spawned as #{$$}!"
  forklet.run
  throw(:bail_because_im_a_worker)
end