class MultiForkr
Attributes
child_defs[R]
Child Process Definitions @return Hash<Object, Integer>
child_sets[R]
Child process pids. @return Hash<Object, [Pids]>
inbound[R]
master_pid[R]
The PID of the Forkr
master @return [Fixnum]
outbound[R]
Public Class Methods
new(forklets)
click to toggle source
@param forklets Hash<Object, Integer> the worker objects, with counts
# File lib/multi_forkr.rb, line 16 def initialize(forklets) @child_defs = forklets @master_pid = $$ @child_sets = Hash.new { |h, k| h[k] = Array.new } @in_shutdown = false end
Public Instance Methods
run()
click to toggle source
Start the master, and spawn workers @return [nil]
# File lib/multi_forkr.rb, line 25 def run @inbound, @outbound = IO.pipe Signal.trap('CHLD') { dead_child } Signal.trap('INT') { interrupt } Signal.trap('TERM') { shutdown } Signal.trap('QUIT') { core_dump_quit } master_loop end
Protected Instance Methods
child_dead?(pid)
click to toggle source
# File lib/multi_forkr.rb, line 156 def child_dead?(pid) status = Process.waitpid(pid, Process::WNOHANG) unless status.nil? $stderr.puts "Process #{pid} dead: #{status}" end !status.nil? end
children()
click to toggle source
# File lib/multi_forkr.rb, line 123 def children @child_sets.values.flat_map { |v| v } end
core_dump_quit()
click to toggle source
# File lib/multi_forkr.rb, line 44 def core_dump_quit send_wake_notice("Q") end
dead_child()
click to toggle source
# File lib/multi_forkr.rb, line 56 def dead_child send_wake_notice("D") end
ensure_right_worker_count()
click to toggle source
# File lib/multi_forkr.rb, line 107 def ensure_right_worker_count @child_defs.each_pair do |k, v| existing_workers = @child_sets[k] off_by = v - existing_workers.length if off_by > 0 off_by.times do spawn_worker(k) end elsif off_by < 0 existing_workers.take(off_by.abs).each do |kid| signal_worker(kid, :TERM) end end end end
interrupt()
click to toggle source
# File lib/multi_forkr.rb, line 48 def interrupt send_wake_notice("I") end
master_loop()
click to toggle source
# File lib/multi_forkr.rb, line 75 def master_loop catch(:bail_because_im_a_worker) do ensure_right_worker_count loop do fds = IO.select([@inbound],nil,nil,2) unless fds.nil? data_read = fds.first.first.read(1) if data_read == "I" shutdown_using(:INT) elsif data_read == "T" shutdown_using(:TERM) elsif data_read == "Q" shutdown_using(:QUIT) end end prune_workers ensure_right_worker_count end reap_all_workers @outbound.close @inbound.close end end
prune_workers()
click to toggle source
# File lib/multi_forkr.rb, line 138 def prune_workers new_sets = {} @child_sets.each_pair do |k, v| living_children = v.reject { |pid| child_dead?(pid) } new_sets[k] = living_children end @child_sets = new_sets end
reap_all_workers()
click to toggle source
# File lib/multi_forkr.rb, line 99 def reap_all_workers begin wpid, status = Process.waitpid2(-1, Process::WNOHANG) rescue Errno::ECHILD break end while true end
send_wake_notice(notice)
click to toggle source
# File lib/multi_forkr.rb, line 38 def send_wake_notice(notice) return(nil) if $$ != master_pid return(nil) if @in_shutdown @outbound.write(notice) end
shutdown()
click to toggle source
# File lib/multi_forkr.rb, line 52 def shutdown send_wake_notice("T") end
shutdown_using(sig)
click to toggle source
# File lib/multi_forkr.rb, line 69 def shutdown_using(sig) @in_shutdown = true signal_all_workers(sig) raise StopIteration.new end
signal_all_workers(sig)
click to toggle source
# File lib/multi_forkr.rb, line 127 def signal_all_workers(sig) children.each { |c| signal_worker(c, sig) } end
signal_worker(wpid, signal)
click to toggle source
# File lib/multi_forkr.rb, line 131 def signal_worker(wpid, signal) begin Process.kill(signal, wpid) rescue Errno::ESRCH end end
spawn_worker(forklet)
click to toggle source
# File lib/multi_forkr.rb, line 60 def spawn_worker(forklet) if new_pid = fork existing_worker_pids = @child_sets[forklet] @child_sets[forklet] = existing_worker_pids + [new_pid] else worker_loop(forklet) end end
worker_loop(forklet)
click to toggle source
# File lib/multi_forkr.rb, line 147 def worker_loop(forklet) forklet.after_fork if forklet.respond_to?(:after_fork) @inbound.close @outbound.close $stderr.puts "Worker spawned as #{$$}!" forklet.run throw(:bail_because_im_a_worker) end