class Unicorn::Server
This is the process manager of Unicorn
. This manages worker processes which in turn handle the I/O and application process. Listener sockets are started in the master process and shared with forked worker children.
Constants
- IO_PURGATORY
prevents IO objects in here from being GC-ed
- LISTENERS
all bound listener sockets
- QUEUE_SIGS
list of signals we care about and trap in master.
- SELF_PIPE
We use
SELF_PIPE
differently in the master and worker processes:-
The master process never closes or reinitializes this once
initialized. Signal handlers in the master process will write to it to wake up the master from IO.select in exactly the same manner djb describes in cr.yp.to/docs/selfpipe.html
-
The workers immediately close the pipe they inherit from the
master and replace it with a new pipe after forking. This new pipe is also used to wakeup from IO.select from inside (worker) signal handlers. However, workers close the pipe descriptors in the signal handlers to raise EBADF in IO.select instead of writing like we do in the master. We cannot easily use the reader set for IO.select because
LISTENERS
is already that set, and it's extra work (and cycles) to distinguish the pipe FD from the reader set once IO.select returns. So we're lazy and just close the pipe when a (rare) signal arrives in the worker and reinitialize the pipe later.-
- SIG_QUEUE
signal queue used for self-piping
- START_CTX
We populate this at startup so we can figure out how to reexecute and upgrade the currently running instance of
Unicorn
This Hash is considered a stable interface and changing its contents will allow you to switch between different installations ofUnicorn
or even different installations of the same applications without downtime. Keys of this constant Hash are described as follows:-
0 - the path to the unicorn/unicorn_rails executable
-
:argv - a deep copy of the ARGV array the executable originally saw
-
:cwd - the working directory of the application, this is where
you originally started
Unicorn
.The following example may be used in your
Unicorn
config file to change your working directory during a config reload (HUP) without upgrading or restarting:Dir.chdir(Unicorn::Server::START_CTX[:cwd] = path)
To change your unicorn executable to a different path without downtime, you can set the following in your
Unicorn
config file, HUP and then continue with the traditional USR2 + QUIT upgrade steps:Unicorn::Server::START_CTX[0] = "/home/bofh/1.9.2/bin/unicorn"
-
- WORKERS
This hash maps PIDs to Workers
Public Class Methods
Creates a working server on host:port (strange things happen if port isn't a Number). Use Server::run to start the server and Server.run.join to join the thread that's processing incoming requests on the socket.
# File lib/unicorn.rb, line 147 def initialize(options = {}) self.app = options.delete(:app) self.reexec_pid = 0 self.ready_pipe = options.delete(:ready_pipe) self.init_listeners = options[:listeners] ? options[:listeners].dup : [] self.config = Configurator.new(options.merge(:use_defaults => true)) self.listener_opts = {} # we try inheriting listeners first, so we bind them later. # we don't write the pid file until we've bound listeners in case # unicorn was started twice by mistake. Even though our #pid= method # checks for stale/existing pid files, race conditions are still # possible (and difficult/non-portable to avoid) and can be likely # to clobber the pid if the second start was in quick succession # after the first, so we rely on the listener binding to fail in # that case. Some tests (in and outside of this source tree) and # monitoring tools may also rely on pid files existing before we # attempt to connect to the listener(s) config.commit!(self, :skip => [:listeners, :pid]) self.orig_app = app end
Public Instance Methods
monitors children and receives signals forever (or until a termination signal is sent). This handles signals one-at-a-time time and we'll happily drop signals in case somebody is signalling us too often.
# File lib/unicorn.rb, line 316 def join respawn = true last_check = Time.now proc_name 'master' logger.info "master process ready" # test_exec.rb relies on this message if ready_pipe ready_pipe.syswrite($$.to_s) ready_pipe.close rescue nil self.ready_pipe = nil end begin loop do reap_all_workers case SIG_QUEUE.shift when nil # avoid murdering workers after our master process (or the # machine) comes out of suspend/hibernation if (last_check + timeout) >= (last_check = Time.now) murder_lazy_workers end maintain_worker_count if respawn master_sleep when :QUIT # graceful shutdown break when :TERM, :INT # immediate shutdown stop(false) break when :USR1 # rotate logs logger.info "master reopening logs..." Unicorn::Util.reopen_logs logger.info "master done reopening logs" kill_each_worker(:USR1) when :USR2 # exec binary, stay alive in case something went wrong reexec when :WINCH if Process.ppid == 1 || Process.getpgrp != $$ respawn = false logger.info "gracefully stopping all workers" kill_each_worker(:QUIT) else logger.info "SIGWINCH ignored because we're not daemonized" end when :TTIN self.worker_processes += 1 when :TTOU self.worker_processes -= 1 if self.worker_processes > 0 when :HUP respawn = true if config.config_file load_config! redo # immediate reaping since we may have QUIT workers else # exec binary and exit if there's no config file logger.info "config_file not present, reexecuting binary" reexec break end end end rescue Errno::EINTR retry rescue => e logger.error "Unhandled master loop exception #{e.inspect}." logger.error e.backtrace.join("\n") retry end stop # gracefully shutdown all workers on our way out logger.info "master complete" unlink_pid_safe(pid) if pid end
add a given address to the listeners
set, idempotently Allows workers to add a private, per-process listener via the after_fork hook. Very useful for debugging and testing. :tries
may be specified as an option for the number of times to retry, and :delay
may be specified as the time in seconds to delay between retries. A negative value for :tries
indicates the listen will be retried indefinitely, this is useful when workers belonging to different masters are spawned during a transparent upgrade.
# File lib/unicorn.rb, line 283 def listen(address, opt = {}.merge(listener_opts[address] || {})) address = config.expand_addr(address) return if String === address && listener_names.include?(address) delay = opt[:delay] || 0.5 tries = opt[:tries] || 5 begin io = bind_listen(address, opt) unless TCPServer === io || UNIXServer === io IO_PURGATORY << io io = server_cast(io) end logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" LISTENERS << io io rescue Errno::EADDRINUSE => err logger.error "adding listener failed addr=#{address} (in use)" raise err if tries == 0 tries -= 1 logger.error "retrying in #{delay} seconds " \ "(#{tries < 0 ? 'infinite' : tries} tries left)" sleep(delay) retry rescue => err logger.fatal "error adding listener addr=#{address}" raise err end end
replaces current listener set with listeners
. This will close the socket if it will not exist in the new listener set
# File lib/unicorn.rb, line 218 def listeners=(listeners) cur_names, dead_names = [], [] listener_names.each do |name| if ?/ == name[0] # mark unlinked sockets as dead so we can rebind them (File.socket?(name) ? cur_names : dead_names) << name else cur_names << name end end set_names = listener_names(listeners) dead_names.concat(cur_names - set_names).uniq! LISTENERS.delete_if do |io| if dead_names.include?(sock_name(io)) IO_PURGATORY.delete_if do |pio| pio.fileno == io.fileno && (pio.close rescue nil).nil? # true end (io.close rescue nil).nil? # true else set_server_sockopt(io, listener_opts[sock_name(io)]) false end end (set_names - cur_names).each { |addr| listen(addr) } end
sets the path for the PID file of the master process
# File lib/unicorn.rb, line 250 def pid=(path) if path if x = valid_pid?(path) return path if pid && path == pid && x == $$ raise ArgumentError, "Already running on PID:#{x} " \ "(or pid=#{path} is stale)" end end unlink_pid_safe(pid) if pid if path fp = begin tmp = "#{File.dirname(path)}/#{rand}.#$$" File.open(tmp, File::RDWR|File::CREAT|File::EXCL, 0644) rescue Errno::EEXIST retry end fp.syswrite("#$$\n") File.rename(fp.path, path) fp.close end super(path) end
Runs the thing. Returns self so you can run join on it
# File lib/unicorn.rb, line 170 def start BasicSocket.do_not_reverse_lookup = true # inherit sockets from parents, they need to be plain Socket objects # before they become UNIXServer or TCPServer inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd| io = Socket.for_fd(fd.to_i) set_server_sockopt(io, listener_opts[sock_name(io)]) IO_PURGATORY << io logger.info "inherited addr=#{sock_name(io)} fd=#{fd}" server_cast(io) end config_listeners = config[:listeners].dup LISTENERS.replace(inherited) # we start out with generic Socket objects that get cast to either # TCPServer or UNIXServer objects; but since the Socket objects # share the same OS-level file descriptor as the higher-level *Server # objects; we need to prevent Socket objects from being garbage-collected config_listeners -= listener_names if config_listeners.empty? && LISTENERS.empty? config_listeners << Unicorn::Const::DEFAULT_LISTEN init_listeners << Unicorn::Const::DEFAULT_LISTEN START_CTX[:argv] << "-l#{Unicorn::Const::DEFAULT_LISTEN}" end config_listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? # this pipe is used to wake us up from select(2) in #join when signals # are trapped. See trap_deferred. init_self_pipe! # setup signal handlers before writing pid file in case people get # trigger happy and send signals as soon as the pid file exists. # Note that signals don't actually get handled until the #join method QUEUE_SIGS.each { |sig| trap_deferred(sig) } trap(:CHLD) { |_| awaken_master } self.pid = config[:pid] self.master_pid = $$ build_app! if preload_app maintain_worker_count self end
# File lib/unicorn.rb, line 247 def stderr_path=(path); redirect_io($stderr, path); end
# File lib/unicorn.rb, line 246 def stdout_path=(path); redirect_io($stdout, path); end
Terminates all workers, but does not exit master process
# File lib/unicorn.rb, line 388 def stop(graceful = true) self.listeners = [] limit = Time.now + timeout until WORKERS.empty? || Time.now > limit kill_each_worker(graceful ? :QUIT : :TERM) sleep(0.1) reap_all_workers end kill_each_worker(:KILL) end
Private Instance Methods
# File lib/unicorn.rb, line 428 def awaken_master begin SELF_PIPE.last.write_nonblock('.') # wakeup master process from select rescue Errno::EAGAIN, Errno::EINTR # pipe is full, master should wake up anyways retry end end
# File lib/unicorn.rb, line 709 def build_app! if app.respond_to?(:arity) && app.arity == 0 # exploit COW in case of preload_app. Also avoids race # conditions in Rainbows! since load/require are not thread-safe Unicorn.constants.each { |x| Unicorn.const_get(x) } if defined?(Gem) && Gem.respond_to?(:refresh) logger.info "Refreshing Gem list" Gem.refresh end self.app = app.call end end
# File lib/unicorn.rb, line 733 def init_self_pipe! SELF_PIPE.each { |io| io.close rescue nil } SELF_PIPE.replace(IO.pipe) SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } end
gets rid of stuff the worker has no business keeping track of to free some resources and drops all sig handlers. traps for USR1, USR2, and HUP may be set in the after_fork Proc by the user.
# File lib/unicorn.rb, line 558 def init_worker_process(worker) QUEUE_SIGS.each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') SIG_QUEUE.clear proc_name "work[#{worker.nr}]" START_CTX.clear init_self_pipe! WORKERS.values.each { |other| other.tmp.close rescue nil } WORKERS.clear LISTENERS.each { |sock| sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } worker.tmp.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) after_fork.call(self, worker) # can drop perms worker.user(*user) if user.kind_of?(Array) && ! worker.switched self.timeout /= 2.0 # halve it for select() build_app! unless preload_app end
delivers a signal to each worker
# File lib/unicorn.rb, line 659 def kill_each_worker(signal) WORKERS.keys.each { |wpid| kill_worker(signal, wpid) } end
delivers a signal to a worker and fails gracefully if the worker is no longer running.
# File lib/unicorn.rb, line 650 def kill_worker(signal, wpid) begin Process.kill(signal, wpid) rescue Errno::ESRCH worker = WORKERS.delete(wpid) and worker.tmp.close rescue nil end end
returns an array of string names for the given listener array
# File lib/unicorn.rb, line 705 def listener_names(listeners = LISTENERS) listeners.map { |io| sock_name(io) } end
# File lib/unicorn.rb, line 685 def load_config! loaded_app = app begin logger.info "reloading config_file=#{config.config_file}" config[:listeners].replace(init_listeners) config.reload config.commit!(self) kill_each_worker(:QUIT) Unicorn::Util.reopen_logs self.app = orig_app build_app! if preload_app logger.info "done reloading config_file=#{config.config_file}" rescue StandardError, LoadError, SyntaxError => e logger.error "error reloading config_file=#{config.config_file}: " \ "#{e.class} #{e.message}" self.app = loaded_app end end
# File lib/unicorn.rb, line 539 def maintain_worker_count (off = WORKERS.size - worker_processes) == 0 and return off < 0 and return spawn_missing_workers WORKERS.dup.each_pair { |wpid,w| w.nr >= worker_processes and kill_worker(:QUIT, wpid) rescue nil } end
wait for a signal hander to wake us up and then consume the pipe Wake up every second anyways to run murder_lazy_workers
# File lib/unicorn.rb, line 419 def master_sleep begin ready = IO.select([SELF_PIPE.first], nil, nil, 1) or return ready.first && ready.first.first or return loop { SELF_PIPE.first.read_nonblock(Const::CHUNK_SIZE) } rescue Errno::EAGAIN, Errno::EINTR end end
forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File shared between the parent process and each worker. The worker runs File#chmod to modify the ctime of the File. If the ctime is stale for >timeout seconds, then we'll kill the corresponding worker.
# File lib/unicorn.rb, line 514 def murder_lazy_workers WORKERS.dup.each_pair do |wpid, worker| stat = worker.tmp.stat # skip workers that disable fchmod or have never fchmod-ed stat.mode == 0100600 and next (diff = (Time.now - stat.ctime)) <= timeout and next logger.error "worker=#{worker.nr} PID:#{wpid} timeout " \ "(#{diff}s > #{timeout}s), killing" kill_worker(:KILL, wpid) # take no prisoners for timeout violations end end
# File lib/unicorn.rb, line 723 def proc_name(tag) $0 = ([ File.basename(START_CTX[0]), tag ]).concat(START_CTX[:argv]).join(' ') end
once a client is accepted, it is processed in its entirety here in 3 easy steps: read request, call app, write app response
# File lib/unicorn.rb, line 549 def process_client(client) client.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) app.call(client) end
reaps all unreaped workers
# File lib/unicorn.rb, line 438 def reap_all_workers begin loop do wpid, status = Process.waitpid2(-1, Process::WNOHANG) wpid or break if reexec_pid == wpid logger.error "reaped #{status.inspect} exec()-ed" self.reexec_pid = 0 self.pid = pid.chomp('.oldbin') if pid proc_name 'master' else worker = WORKERS.delete(wpid) and worker.tmp.close rescue nil logger.info "reaped #{status.inspect} " \ "worker=#{worker.nr rescue 'unknown'}" end end rescue Errno::ECHILD end end
# File lib/unicorn.rb, line 728 def redirect_io(io, path) File.open(path, 'ab') { |fp| io.reopen(fp) } if path io.sync = true end
reexecutes the START_CTX
with a new binary
# File lib/unicorn.rb, line 459 def reexec if reexec_pid > 0 begin Process.kill(0, reexec_pid) logger.error "reexec-ed child already running PID:#{reexec_pid}" return rescue Errno::ESRCH self.reexec_pid = 0 end end if pid old_pid = "#{pid}.oldbin" prev_pid = pid.dup begin self.pid = old_pid # clear the path for a new pid file rescue ArgumentError logger.error "old PID:#{valid_pid?(old_pid)} running with " \ "existing pid=#{old_pid}, refusing rexec" return rescue => e logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}" return end end self.reexec_pid = fork do listener_fds = LISTENERS.map { |sock| sock.fileno } ENV['UNICORN_FD'] = listener_fds.join(',') Dir.chdir(START_CTX[:cwd]) cmd = [ START_CTX[0] ].concat(START_CTX[:argv]) # avoid leaking FDs we don't know about, but let before_exec # unset FD_CLOEXEC, if anything else in the app eventually # relies on FD inheritence. (3..1024).each do |io| next if listener_fds.include?(io) io = IO.for_fd(io) rescue nil io or next IO_PURGATORY << io io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" before_exec.call(self) exec(*cmd) end proc_name 'master (old)' end
# File lib/unicorn.rb, line 575 def reopen_worker_logs(worker_nr) logger.info "worker=#{worker_nr} reopening logs..." Unicorn::Util.reopen_logs logger.info "worker=#{worker_nr} done reopening logs" init_self_pipe! end
# File lib/unicorn.rb, line 526 def spawn_missing_workers (0...worker_processes).each do |worker_nr| WORKERS.values.include?(worker_nr) and next worker = Worker.new(worker_nr, Unicorn::Util.tmpio) before_fork.call(self, worker) WORKERS[fork { ready_pipe.close if ready_pipe self.ready_pipe = nil worker_loop(worker) }] = worker end end
defer a signal for later processing in join
(master process)
# File lib/unicorn.rb, line 406 def trap_deferred(signal) trap(signal) do |sig_nr| if SIG_QUEUE.size < 5 SIG_QUEUE << signal awaken_master else logger.error "ignoring SIG#{signal}, queue=#{SIG_QUEUE.inspect}" end end end
unlinks a PID file at given path
if it contains the current PID still potentially racy without locking the directory (which is non-portable and may interact badly with other programs), but the window for hitting the race condition is small
# File lib/unicorn.rb, line 667 def unlink_pid_safe(path) (File.read(path).to_i == $$ and File.unlink(path)) rescue nil end
returns a PID if a given path contains a non-stale PID file, nil otherwise.
# File lib/unicorn.rb, line 673 def valid_pid?(path) wpid = File.read(path).to_i wpid <= 0 and return nil begin Process.kill(0, wpid) wpid rescue Errno::ESRCH # don't unlink stale pid files, racy without non-portable locking... end rescue Errno::ENOENT end
runs inside each forked worker, this sits around and waits for connections and doesn't die until the parent dies (or is given a INT, QUIT, or TERM signal)
# File lib/unicorn.rb, line 585 def worker_loop(worker) ppid = master_pid init_worker_process(worker) nr = 0 # this becomes negative if we need to reopen logs alive = worker.tmp # tmp is our lifeline to the master process ready = LISTENERS # closing anything we IO.select on will raise EBADF trap(:USR1) { nr = -65536; SELF_PIPE.first.close rescue nil } trap(:QUIT) { alive = nil; LISTENERS.each { |s| s.close rescue nil } } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "worker=#{worker.nr} ready" m = 0 begin nr < 0 and reopen_worker_logs(worker.nr) nr = 0 # we're a goner in timeout seconds anyways if alive.chmod # breaks, so don't trap the exception. Using fchmod() since # futimes() is not available in base Ruby and I very strongly # prefer temporary files to be unlinked for security, # performance and reliability reasons, so utime is out. No-op # changes with chmod doesn't update ctime on all filesystems; so # we change our counter each and every time (after process_client # and before IO.select). alive.chmod(m = 0 == m ? 1 : 0) ready.each do |sock| begin process_client(sock.accept_nonblock) nr += 1 alive.chmod(m = 0 == m ? 1 : 0) rescue Errno::EAGAIN, Errno::ECONNABORTED end break if nr < 0 end # make the following bet: if we accepted clients this round, # we're probably reasonably busy, so avoid calling select() # and do a speculative accept_nonblock on ready listeners # before we sleep again in select(). redo unless nr == 0 # (nr < 0) => reopen logs ppid == Process.ppid or return alive.chmod(m = 0 == m ? 1 : 0) begin # timeout used so we can detect parent death: ret = IO.select(LISTENERS, nil, SELF_PIPE, timeout) or redo ready = ret.first rescue Errno::EINTR ready = LISTENERS rescue Errno::EBADF nr < 0 or return end rescue => e if alive logger.error "Unhandled listen loop exception #{e.inspect}." logger.error e.backtrace.join("\n") end end while alive end