class Kinetic::Master
Constants
- EXIT_SIGS
- QUEUE_SIGS
- SELF_PIPE
- SIG_QUEUE
- WORKERS
- WORKER_QUEUE_SIGS
Attributes
app[R]
reexecpid[R]
Public Class Methods
new(app)
click to toggle source
# File lib/kinetic/master.rb, line 14 def initialize(app) @app = app end
Public Instance Methods
run()
click to toggle source
Starts and initializes the master process.
# File lib/kinetic/master.rb, line 30 def run begin daemonize if config[:daemonize] enable_logging! logger.debug 'Configuration:' logger.ap app.config.to_hash proc_name 'master' initialize_self_pipe! initialize_signal_traps! spawn_missing_workers join rescue => e logger.fatal e logger.fatal 'Unable to start worker!' ensure call_on_exit_callbacks exit! end end
start()
click to toggle source
Starts and daemonizes the master process
# File lib/kinetic/master.rb, line 19 def start if File.exists?(config.pid) puts "Unable to start master process, a process with the pidfile #{config.pid} already exists." + 'If you wish to run another instance of this application please pass --pid to set a new pid file.' exit(1) end config.daemonize = true run end
stop()
click to toggle source
# File lib/kinetic/master.rb, line 50 def stop unless File.exists?(File.join(config.root, config.pid)) puts "Unable to stop process, a process with the pidfile #{config.pid} could not be found" + 'If you have started this app with another pid file location pass --pid to set the pid file.' exit(1) end pid = File.open(File.join(config.root, config.pid), 'r') { |f| f.read }.to_i Process.kill(:QUIT, pid) end
Private Instance Methods
awaken_master()
click to toggle source
# File lib/kinetic/master.rb, line 104 def awaken_master SELF_PIPE[1].kgio_trywrite('.') end
call_on_exit_callbacks()
click to toggle source
# File lib/kinetic/master.rb, line 114 def call_on_exit_callbacks ensure FileUtils.rm(config[:pid]) if config[:pid] && config[:daemonize] end
daemonize()
click to toggle source
# File lib/kinetic/master.rb, line 67 def daemonize logger.info 'Daemonizing process' Process.daemon true, false write_pidfile! end
enable_logging!()
click to toggle source
# File lib/kinetic/master.rb, line 62 def enable_logging! config.logging = true app.class.instance_variable_set(:@logger, nil) end
init_worker_process(worker)
click to toggle source
gets rid of stuff the worker has no business keeping track of to free some resources and drops all sig handlers. traps for USR1, USR2, and HUP may be set in the after_fork Proc by the user.
# File lib/kinetic/master.rb, line 155 def init_worker_process(worker) worker.atfork_child # we'll re-trap :QUIT later for graceful shutdown iff we accept clients EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } } exit!(0) if (SIG_QUEUE & EXIT_SIGS)[0] WORKER_QUEUE_SIGS.each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') SIG_QUEUE.clear WORKERS.clear after_fork.call(self, worker) end
initialize_self_pipe!()
click to toggle source
Based on Unicorn's self-pipe
# File lib/kinetic/master.rb, line 93 def initialize_self_pipe! logger.debug 'Initializing self pipe' SELF_PIPE.replace Kgio::Pipe.new.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } end
initialize_signal_traps!()
click to toggle source
# File lib/kinetic/master.rb, line 98 def initialize_signal_traps! logger.debug 'Initializing signal traps' QUEUE_SIGS.each { |sig| trap(sig) {SIG_QUEUE << sig; awaken_master } } trap(:CHLD) { awaken_master } end
join()
click to toggle source
# File lib/kinetic/master.rb, line 78 def join logger.debug 'Joining thread' case SIG_QUEUE.shift when :QUIT stop_all(true) break when :TERM, :INT stop_all(false) break else master_sleep(0.5) end while true end
kill_each_worker(signal)
click to toggle source
# File lib/kinetic/master.rb, line 184 def kill_each_worker(signal) WORKERS.keys.each { |wpid| kill_worker(signal, wpid) } end
kill_worker(signal, wpid)
click to toggle source
delivers a signal to a worker and fails gracefully if the worker is no longer running.
# File lib/kinetic/master.rb, line 190 def kill_worker(signal, wpid) Process.kill(signal, wpid) rescue Errno::ESRCH worker = WORKERS.delete(wpid) and worker.close rescue nil end
master_sleep(time)
click to toggle source
# File lib/kinetic/master.rb, line 73 def master_sleep(time) IO.select([ SELF_PIPE[0] ], nil, nil, time) or return SELF_PIPE[0].kgio_tryread(11) end
proc_name(tag)
click to toggle source
# File lib/kinetic/master.rb, line 222 def proc_name(tag) $0 = "#{config.name} [#{tag}]" end
reap_all_workers()
click to toggle source
reaps all unreaped workers
# File lib/kinetic/master.rb, line 202 def reap_all_workers begin wpid, status = Process.waitpid2(-1, Process::WNOHANG) wpid or return if reexec_pid == wpid logger.error "reaped #{status.inspect} exec()-ed" self.reexec_pid = 0 self.pid = pid.chomp('.oldbin') if pid proc_name 'master' else worker = WORKERS.delete(wpid) and worker.close rescue nil m = "reaped #{status.inspect} worker=#{worker.nr rescue 'unknown'}" status.success? ? logger.info(m) : logger.error(m) end rescue Errno::ECHILD break end while true end
soft_kill_each_worker(signal)
click to toggle source
# File lib/kinetic/master.rb, line 196 def soft_kill_each_worker(signal) WORKERS.each_value { |worker| worker.soft_kill(signal) } end
spawn_missing_workers()
click to toggle source
# File lib/kinetic/master.rb, line 119 def spawn_missing_workers logger.debug 'Spawning missing workers' worker_nr = -1 until (worker_nr += 1) == config.workers WORKERS.value?(worker_nr) and next #noinspection RubyArgCount worker = Worker.new(worker_nr, @app) logger.debug 'Forking worker' parent = Process.pid pid = fork do begin proc_name "worker-#{worker_nr}" logger.debug 'Calling after fork procs' app.send(:after_fork_procs).each { |p| p.call } #noinspection RubyArgCount worker.run rescue => e logger.fatal e logger.fatal 'Unable to start workers' Process.kill(:TERM, parent) end end logger.debug "Worker #{worker_nr} started on #{pid}" WORKERS[pid] = worker end rescue => e logger.error(e) end
stop_all(graceful = true)
click to toggle source
Terminates all workers, but does not exit master process
# File lib/kinetic/master.rb, line 169 def stop_all(graceful = true) logger.warn graceful ? 'Process is shutting down gracefully' : 'Process is shutting down immediately!' limit = Time.now + config.timeout until WORKERS.empty? || Time.now > limit if graceful soft_kill_each_worker(:QUIT) else kill_each_worker(:TERM) end sleep(0.1) reap_all_workers end kill_each_worker(:KILL) end
write_pidfile!()
click to toggle source
# File lib/kinetic/master.rb, line 108 def write_pidfile! set(:pid, File.join(config.root, "#{config.name}.pid")) logger.info "Writing PID file to #{config.pid}" File.open(config.pid, 'w') { |f| f.write(Process.pid) } end