class Server::Starter
Constants
- VERSION
Public Class Methods
new()
click to toggle source
# File lib/server/starter.rb, line 10 def initialize @signals_received = [] @current_worker = nil @old_workers = {} @last_restart_time = [] end
Public Instance Methods
_reload_env()
click to toggle source
# File lib/server/starter.rb, line 377 def _reload_env dn = ENV['ENVDIR'] return {} if dn.nil? or !File.exist?(dn) env = {} Dir.open(dn) do |d| while n = d.read next if n =~ /^\./ File.open("#{dn}/#{n}") do |fh| first_line = fh.gets.chomp env[n] = first_line if first_line end end end env end
restart_server(opts)
click to toggle source
# File lib/server/starter.rb, line 324 def restart_server(opts) unless opts[:pid_file] && opts[:status_file] die "--restart option requires --pid-file and --status-file to be set as well" end # get first pid pid = Proc.new { begin File.open(opts[:pid_file]) do |fd| line = fd.gets line.chomp end rescue die "failed to open file:#{opts[:pid_file]}" end }.call # function that returns a list of active generations in sorted order get_generations = Proc.new { begin File.readlines(opts[:status_file]).map do |line| line =~ /^(\d+):/ ? $1 : nil end.compact.map(&:to_i).sort.uniq rescue die "failed to open file:#{opts[:status_file]}" end } # wait for this generation wait_for = Proc.new { gens = get_generations.call die "no active process found in the status file" if gens.empty? gens.last.to_i + 1 }.call # send HUP Process.kill('HUP', pid.to_i) rescue die "failed to send SIGHUP to the server process" # wait for the generation while true gens = get_generations.call break if gens.size == 1 && gens[0].to_i == wait_for.to_i sleep 1 end end
server_ports()
click to toggle source
# File lib/server/starter.rb, line 370 def server_ports die "no environment variable SERVER_STARTER_PORT. Did you start the process using server_starter?" unless ENV['SERVER_STARTER_PORT'] ENV['SERVER_STARTER_PORT'].split(';').map do |_| _.split('=', 2) end.to_h end
start_server(opts)
click to toggle source
# File lib/server/starter.rb, line 17 def start_server(opts) # symbolize keys opts = opts.map {|k, v| [k.to_sym, v] }.to_h opts[:interval] ||= 1 opts[:signal_on_hup] ||= 'TERM' opts[:signal_on_term] ||= 'TERM' opts[:backlog] ||= Socket::SOMAXCONN [:signal_on_hup, :signal_on_term].each do |key| # normalize to the one that can be passed to kill opts[key].tr!("a-z", "A-Z") opts[key].sub!(/^SIG/i, "") end # prepare args ports = Array(opts[:port]) paths = Array(opts[:path]) unless ports.empty? || paths.empty? croak "either of ``port'' or ``path'' option is mandatory" end unless opts[:exec] && opts[:exec].is_a?(Array) croak "mandatory option ``exec'' is missing or not an array" end # set envs ENV['ENVDIR'] = opts[:envdir] if opts[:envdir] ENV['ENABLE_AUTO_RESTART'] = opts[:enable_auto_restart] ? '1' : nil ENV['KILL_OLD_DELAY'] = opts[:kill_old_delay].to_s if opts[:kill_old_delay] ENV['AUTO_RESTART_INTERVAL'] = opts[:auto_restart_interval].to_s if opts[:auto_restart_interval] # open pid file if opts[:pid_file] File.open(opts[:pid_file], "w") do |fh| fh.puts $$ end rescue die "failed to open file:#{opts[:pid_file]}" at_exit { File.unlink opts[:pid_file] rescue nil } end # open log file if opts[:log_file] File.open(opts[:log_file], "a") do |fh| $stdout.flush $stderr.flush $stdout.reopen(fh) rescue die "failed to reopen STDOUT to file" $stderr.reopen(fh) rescue die "failed to reopen STDERR to file" end end # create guard that removes the status file if opts[:status_file] at_exit { File.unlink opts[:status_file] rescue nil } end $stderr.puts "start_server (pid:#{$$}) starting now..." # start listening, setup envvar socks = [] sockenvs = [] ports.each do |port| sock = nil begin if port =~ /^\s*(\d+)\s*$/ # sock = Socket.new(:INET, :STREAM) # addr = Socket.pack_sockaddr_in(port, '0.0.0.0') # sock.setsockopt(:SOCKET, :REUSEADDR, true) # sock.bind(addr) # sock.listen(opts[:backlog]) sock = TCPServer.new("0.0.0.0", port) sock.setsockopt(:SOCKET, :REUSEADDR, true) sock.listen(opts[:backlog]) elsif port =~ /^\s*(.*)\s*:\s*(\d+)\s*$/ _bind, _port = $1, $2 sock = TCPServer.new(_bind, _port) sock.setsockopt(:SOCKET, :REUSEADDR, true) sock.listen(opts[:backlog]) else croak "invalid ``port'' value:#{port}" end rescue die "failed to listen to #{port}" end sock.fcntl(Fcntl::F_SETFD, 0) rescue die "fcntl(F_SETFD, 0) failed" sockenvs.push "#{port}=#{sock.fileno}" socks.push sock end at_exit { paths.each do |path| File.symlink?(path) and File.unlink(path) rescue nil end } paths.each do |path| if File.symlink?(path) warn "removing existing socket file:#{path}" File.unlink(path) rescue die "failed to remove existing socket file:#{path}" end File.unlink(path) rescue nil saved_umask = File.umask(0) begin sock = UNIXServer.new(path) sock.listen(opts[:backlog]) rescue die "failed to listen to file #{path}" end sock.fcntl(Fcntl::F_SETFD, 0) rescue die "fcntl(F_SETFD, 0) failed" sockenvs.push "#{path}=#{sock.fileno}" socks.push sock end ENV['SERVER_STARTER_PORT'] = sockenvs.join(";") ENV['SERVER_STARTER_GENERATION'] = "0" # setup signal handlers %w(INT TERM HUP ALRM).each do |signal| Signal.trap(signal) { @signals_received.push(signal) @signal_wait_thread.kill if @signal_wait_thread } end Signal.trap('PIPE') { 'IGNORE' } # setup status monitor update_status = if opts[:status_file] Proc.new { tmpfn = "#{opts[:status_file]}.#{$$}" File.open(tmpfn, "w") do |tmpfh| gen_pids = @current_worker ? {ENV['SERVER_STARTER_GENERATION'] => @current_worker} : {} @old_workers.each {|pid, gen| gen_pids[gen] = pid } gen_pids.keys.map(&:to_i).sort.each {|gen| tmpfh.puts "#{gen}:#{gen_pids[gen.to_s]}" } end rescue die "failed to create temporary file:#{tmpfn}" begin File.rename(tmpfn, opts[:status_file]) rescue die "failed to rename #{tmpfn} to #{opts[:status_file]}" end } else Proc.new {} end # setup the start_worker function start_worker = Proc.new { pid = nil while true ENV['SERVER_STARTER_GENERATION'] = (ENV['SERVER_STARTER_GENERATION'].to_i + 1).to_s begin pid = fork rescue die "fork(2) failed" end if pid.nil? # child process args = Array(opts[:exec]).dup if opts[:dir] Dir.chdir opts[:dir] rescue die "failed to chdir" end begin bundler_with_clean_env do args << {:close_others => false} exec(*args) end rescue $stderr.puts "failed to exec #{args[0]}:#{$!.class} #{$!.message}" exit(255) end end $stderr.puts "starting new worker #{pid}" sleep opts[:interval] break if (@signals_received - [:HUP]).size > 0 break if Process.waitpid(pid, Process::WNOHANG).nil? $stderr.puts "new worker #{pid} seems to have failed to start, exit status:#{$?.exitstatus}" end # ready, update the environment @current_worker = pid @last_restart_time = Time.now update_status.call } # setup the wait function wait = Proc.new { flags = @signals_received.empty? ? 0 : Process::WNOHANG r = nil # waitpid can not get EINTR on receiving signal, so create a thread, # and kill the thread on receiving signal to exit blocking # # there is another way to use wait3 which raises EINTR on receiving signal, # but proc-wait3 gem requires gcc, etc to compile its C codes. # # require 'proc/wait3' # begin # rusage = Process.wait3(flags) # r = [rusage.pid, rusage.status] if rusage # rescue Errno::EINTR # sleep 0.1 # need to wait until Signal.trap finishes its operation, terrible # nil # end @signal_wait_thread = Thread.start do if flags != 0 && ENV['ENABLE_AUTO_RESTART'] begin Timeout.timeout(1) do pid = Process.waitpid(-1, flags) r = [pid, $?.exitstatus] if pid end rescue Timeout::Error # Process.kill('ALRM', Process.pid) Thread.exit end else pid = Process.waitpid(-1, flags) r = [pid, $?.exitstatus] if pid end end @signal_wait_thread.join @signal_wait_thread = nil r } # setup the cleanup function cleanup = Proc.new {|sig| term_signal = sig == 'TERM' ? opts[:signal_on_term] : 'TERM' @old_workers[@current_worker] = ENV['SERVER_STARTER_GENERATION'] @current_worker = nil $stderr.print "received #{sig}, sending #{term_signal} to all workers:", @old_workers.keys.sort.join(','), "\n" @old_workers.keys.sort.each {|pid| Process.kill(term_signal, pid) } while true died_worker = Process.waitpid(-1, Process::WNOHANG) if died_worker $stderr.puts "worker #{died_worker} died, status:#{$?.exitstatus}" @old_workers.delete(died_worker) update_status.call break if @old_workers.empty? end end $stderr.puts "exiting" } # the main loop start_worker.call while true # wait for next signal (or when auto-restart becomes necessary) r = wait.call # reload env if necessary loaded_env = _reload_env ENV['AUTO_RESTART_INTERVAL'] ||= "360" if ENV['ENABLE_AUTO_RESTART'] with_local_env(loaded_env) do # restart if worker died if r died_worker, status = r if died_worker == @current_worker $stderr.puts "worker #{died_worker} died unexpectedly with status:#{status}, restarting" start_worker.call else $stderr.puts "old worker #{died_worker} died, status:#{status}" @old_workers.delete(died_worker) update_status.call end end # handle signals restart = nil while !@signals_received.empty? sig = @signals_received.shift if sig == 'HUP' $stderr.puts "received HUP, spawning a new worker" restart = true break elsif sig == 'ALRM' # skip else return cleanup.call(sig) end end if !restart && ENV['ENABLE_AUTO_RESTART'] auto_restart_interval = ENV['AUTO_RESTART_INTERVAL'].to_i elapsed_since_restart = Time.now - @last_restart_time if elapsed_since_restart >= auto_restart_interval && @old_workers.empty? $stderr.puts "autorestart triggered (interval=#{auto_restart_interval})" restart = true elsif elapsed_since_restart >= auto_restart_interval * 2 $stderr.puts "autorestart triggered (forced, interval=#{auto_restart_interval})" restart = true end end # restart if requested if restart @old_workers[@current_worker] = ENV['SERVER_STARTER_GENERATION'] start_worker.call $stderr.print "new worker is now running, sending #{opts[:signal_on_hup]} to old workers:" if !@old_workers.empty? $stderr.puts @old_workers.keys.sort.join(',') else $stderr.puts "none" end kill_old_delay = ENV['KILL_OLD_DELAY'] ? ENV['KILL_OLD_DELAY'].to_i : ENV['ENABLE_AUTO_RESTART'] ? 5 : 0 if kill_old_delay != 0 $stderr.puts "sleeping #{kill_old_delay} secs before killing old workers" sleep kill_old_delay end $stderr.puts "killing old workers" @old_workers.keys.sort.each {|pid| Process.kill(opts[:signal_on_hup], pid) } end end end die "unreachable" end