class Forkworker::Leader
Public Class Methods
new(worker_num, pidfile: nil, setup_block: nil, prefork_block: nil, fork_block: nil, reporting_block: nil)
click to toggle source
# File lib/forkworker/leader.rb, line 5 def initialize(worker_num, pidfile: nil, setup_block: nil, prefork_block: nil, fork_block: nil, reporting_block: nil) @wanted_number_of_workers = worker_num @running = true @worker_pids = [] @signals_received = [] @pidfile = pidfile @setup_block = setup_block @prefork_block = prefork_block @fork_block = fork_block @reporting_block = reporting_block write_pid if @pidfile end
Public Instance Methods
start!()
click to toggle source
# File lib/forkworker/leader.rb, line 20 def start! traps update_leader_title @setup_block.call if @setup_block spawn_missing_workers gameloop = 1 until @worker_pids.dup.length == 0 && @running == false sleep 0.25 # Handle actions while(signal = @signals_received.shift) case signal when 'CHLD' @worker_pids.dup.each do |wpid| begin wpid, _status = Process.waitpid(wpid, Process::WNOHANG) @worker_pids.delete(wpid) rescue Errno::ECHILD end end when 'TTIN' debug "-- handled #{signal}: wanted number of workers are now: #{@wanted_number_of_workers}" @wanted_number_of_workers += 1 when 'TTOU' unless @wanted_number_of_workers == 0 @wanted_number_of_workers -= 1 end debug "-- handled #{signal}: wanted number of workers are now: #{@wanted_number_of_workers}" when 'TERM' debug "-- handled #{signal}" @running = false shutdown_each_worker(:TERM) when 'QUIT' debug "-- handled #{signal}" @running = false shutdown_each_worker(:QUIT) end end # Spawn missing workers if we are not getting shut down if @running spawn_missing_workers end if gameloop % 20 == 0 && @reporting_block @reporting_block.call end update_leader_title gameloop += 1 end end
Private Instance Methods
shutdown_each_worker(signal)
click to toggle source
# File lib/forkworker/leader.rb, line 141 def shutdown_each_worker(signal) @worker_pids.dup.each { |wpid| shutdown_worker(signal, wpid) } end
shutdown_worker(signal, wpid)
click to toggle source
# File lib/forkworker/leader.rb, line 134 def shutdown_worker(signal, wpid) begin Process.kill(signal, wpid) rescue Errno::ESRCH end end
spawn_missing_workers()
click to toggle source
# File lib/forkworker/leader.rb, line 82 def spawn_missing_workers begin while (@worker_pids.length + 1) <= @wanted_number_of_workers worker_data = if @prefork_block @prefork_block.call else nil end if(pid = fork) @worker_pids << pid update_leader_title else Worker.new.work!(worker_data, &@fork_block) end end rescue Forkworker::NoMoreWork debug "-- No more work, so we're just finishing up running processes" @running = false end end
traps()
click to toggle source
# File lib/forkworker/leader.rb, line 104 def traps # By trapping the :CHLD signal our process will be notified by the kernel when one of its children exits. trap(:CHLD) do @signals_received << 'CHLD' end trap(:TERM) do @signals_received << 'TERM' end trap(:TTIN) do @signals_received << 'TTIN' end trap(:TTOU) do @signals_received << 'TTOU' end [:QUIT, :INT].each do |signal| trap(signal) do @signals_received << 'QUIT' end end end
update_leader_title()
click to toggle source
# File lib/forkworker/leader.rb, line 129 def update_leader_title run_state = @running ? 'running' : 'shutting down' $PROGRAM_NAME = "Leader ##{Process.pid} | #{run_state} | Workers=#{@worker_pids.length}/#{@wanted_number_of_workers}" end
write_pid()
click to toggle source
# File lib/forkworker/leader.rb, line 145 def write_pid if File.exist?(@pidfile) && (pid = File.read(@pidfile)) begin Process.getpgid(pid.to_i) # throws Errno::ESRCH if process with pid exists debug "Program is already running on pid #{pid} specified in #{@pidfile}" exit 1 rescue Errno::ESRCH false end end File.open(@pidfile, 'w') do |f| f.write Process.pid end end