class PhusionPassenger::RequestHandler
Constants
Attributes
A password with which clients must authenticate. Default is unauthenticated.
A hash containing all server sockets that this request handler listens on. The hash is in the form of:
{ name1 => [socket_address1, socket_type1, socket1], name2 => [socket_address2, socket_type2, socket2], ... }
name
is a Symbol. socket_addressx
is the address
of the socket, socket_typex
is the socket's type (either
'unix' or 'tcp') and socketx
is the actual
socket IO objec. There's guaranteed to be at
least one server socket, namely one with the name :main
.
If a soft termination signal was received, then the main loop will quit the given amount of seconds after the last time a connection was accepted. Defaults to 3 seconds.
Public Class Methods
Create a new RequestHandler with the
given owner pipe. owner_pipe
must be the readable part of a
pipe IO object.
Additionally, the following options may be given:
-
detach_key
-
pool_account_username
-
pool_account_password_base64
# File lib/phusion_passenger/request_handler.rb, line 87 def initialize(owner_pipe, options = {}) require_option(options, "app_group_name") install_options_as_ivars(self, options, "app", "app_group_name", "connect_password", "detach_key", "analytics_logger", "pool_account_username" ) @thread_handler = options["thread_handler"] || ThreadHandler @concurrency = 1 if options["pool_account_password_base64"] @pool_account_password = options["pool_account_password_base64"].unpack('m').first end ############# ############# @server_sockets = {} if should_use_unix_sockets? @main_socket_address, @main_socket = create_unix_socket_on_filesystem else @main_socket_address, @main_socket = create_tcp_socket end @server_sockets[:main] = { :address => @main_socket_address, :socket => @main_socket, :protocol => :session, :concurrency => @concurrency } @http_socket_address, @http_socket = create_tcp_socket @server_sockets[:http] = { :address => @http_socket_address, :socket => @http_socket, :protocol => :http, :concurrency => 1 } @owner_pipe = owner_pipe @options = options @previous_signal_handlers = {} @main_loop_generation = 0 @main_loop_thread_lock = Mutex.new @main_loop_thread_cond = ConditionVariable.new @threads = [] @threads_mutex = Mutex.new @soft_termination_linger_time = 3 @main_loop_running = false ############# end
Public Instance Methods
Clean up temporary stuff created by the request handler.
If the main loop was started by main_loop, then this method may only be called after the main loop has exited.
If the main loop was started by start_main_loop_thread, then this method may be called at any time, and it will stop the main loop thread.
# File lib/phusion_passenger/request_handler.rb, line 149 def cleanup if @main_loop_thread @main_loop_thread_lock.synchronize do @graceful_termination_pipe[1].close rescue nil end @main_loop_thread.join end @server_sockets.each_value do |value| address, type, socket = value socket.close rescue nil if type == 'unix' File.unlink(address) rescue nil end end @owner_pipe.close rescue nil end
Enter the request handler's main loop.
# File lib/phusion_passenger/request_handler.rb, line 174 def main_loop debug("Entering request handler main loop") reset_signal_handlers begin @graceful_termination_pipe = IO.pipe @graceful_termination_pipe[0].close_on_exec! @graceful_termination_pipe[1].close_on_exec! @main_loop_thread_lock.synchronize do @main_loop_generation += 1 @main_loop_running = true @main_loop_thread_cond.broadcast @select_timeout = nil @selectable_sockets = [] @server_sockets.each_value do |value| socket = value[2] @selectable_sockets << socket if socket end @selectable_sockets << @owner_pipe @selectable_sockets << @graceful_termination_pipe[0] end install_useful_signal_handlers start_threads wait_until_termination_requested wait_until_all_threads_are_idle terminate_threads debug("Request handler main loop exited normally") rescue EOFError # Exit main loop. trace(2, "Request handler main loop interrupted by EOFError exception") rescue Interrupt # Exit main loop. trace(2, "Request handler main loop interrupted by Interrupt exception") rescue SignalException => signal trace(2, "Request handler main loop interrupted by SignalException") if signal.message != HARD_TERMINATION_SIGNAL && signal.message != SOFT_TERMINATION_SIGNAL raise end rescue Exception => e trace(2, "Request handler main loop interrupted by #{e.class} exception") raise ensure debug("Exiting request handler main loop") revert_signal_handlers @main_loop_thread_lock.synchronize do @graceful_termination_pipe[1].close rescue nil @graceful_termination_pipe[0].close rescue nil @selectable_sockets = [] @main_loop_generation += 1 @main_loop_running = false @main_loop_thread_cond.broadcast end end end
Check whether the main loop's currently running.
# File lib/phusion_passenger/request_handler.rb, line 167 def main_loop_running? @main_loop_thread_lock.synchronize do return @main_loop_running end end
Remove this request handler from the application pool so that no new connections will come in. Then make the main loop quit a few seconds after the last time a connection came in. This all is to ensure that no connections come in while we're shutting down.
May only be called while the main loop is running. May be called from any thread.
# File lib/phusion_passenger/request_handler.rb, line 258 def soft_shutdown @soft_termination_linger_thread ||= Thread.new do debug("Soft termination initiated") if @detach_key && @pool_account_username && @pool_account_password client = MessageClient.new(@pool_account_username, @pool_account_password) begin client.pool_detach_process_by_key(@detach_key) ensure client.close end end wait_until_all_threads_are_idle debug("Soft terminating in #{@soft_termination_linger_time} seconds") sleep @soft_termination_linger_time @graceful_termination_pipe[1].close rescue nil end end
Start the main loop in a new thread. This thread will be stopped by cleanup.
# File lib/phusion_passenger/request_handler.rb, line 235 def start_main_loop_thread current_generation = @main_loop_generation @main_loop_thread = Thread.new do begin main_loop rescue Exception => e print_exception(self.class, e) end end @main_loop_thread_lock.synchronize do while @main_loop_generation == current_generation @main_loop_thread_cond.wait(@main_loop_thread_lock) end end end
Private Instance Methods
# File lib/phusion_passenger/request_handler.rb, line 334 def create_tcp_socket # We use "127.0.0.1" as address in order to force # TCPv4 instead of TCPv6. socket = TCPServer.new('127.0.0.1', 0) socket.listen(BACKLOG_SIZE) socket.close_on_exec! socket_address = "tcp://127.0.0.1:#{socket.addr[1]}" return [socket_address, socket] end
# File lib/phusion_passenger/request_handler.rb, line 313 def create_unix_socket_on_filesystem while true begin if defined?(NativeSupport) unix_path_max = NativeSupport::UNIX_PATH_MAX else unix_path_max = 100 end socket_address = "#{passenger_tmpdir}/backends/ruby.#{generate_random_id(:base64)}" socket_address = socket_address.slice(0, unix_path_max - 10) socket = UNIXServer.new(socket_address) socket.listen(BACKLOG_SIZE) socket.close_on_exec! File.chmod(0600, socket_address) return ["unix:#{socket_address}", socket] rescue Errno::EADDRINUSE # Do nothing, try again with another name. end end end
# File lib/phusion_passenger/request_handler.rb, line 362 def install_useful_signal_handlers trappable_signals = Signal.list_trappable trap(SOFT_TERMINATION_SIGNAL) do begin soft_shutdown rescue => e print_exception("Passenger RequestHandler soft shutdown routine", e) end end if trappable_signals.has_key?(SOFT_TERMINATION_SIGNAL.sub(/^SIG/, '')) trap('ABRT') do print_status_report end if trappable_signals.has_key?('ABRT') trap('QUIT') do print_status_report end if trappable_signals.has_key?('QUIT') end
# File lib/phusion_passenger/request_handler.rb, line 387 def print_status_report warn(Utils.global_backtrace_report) warn("Threads: #{@threads.inspect}") end
Reset signal handlers to their default handler, and install some special handlers for a few signals. The previous signal handlers will be put back by calling revert_signal_handlers.
# File lib/phusion_passenger/request_handler.rb, line 347 def reset_signal_handlers Signal.list_trappable.each_key do |signal| begin prev_handler = trap(signal, DEFAULT) if prev_handler != DEFAULT @previous_signal_handlers[signal] = prev_handler end rescue ArgumentError # Signal cannot be trapped; ignore it. end end trap('HUP', IGNORE) PhusionPassenger.call_event(:after_installing_signal_handlers) end
# File lib/phusion_passenger/request_handler.rb, line 381 def revert_signal_handlers @previous_signal_handlers.each_pair do |signal, handler| trap(signal, handler) end end
# File lib/phusion_passenger/request_handler.rb, line 277 def should_use_unix_sockets? # Historical note: # There seems to be a bug in MacOS X Leopard w.r.t. Unix server # sockets file descriptors that are passed to another process. # Usually Unix server sockets work fine, but when they're passed # to another process, then clients that connect to the socket # can incorrectly determine that the client socket is closed, # even though that's not actually the case. More specifically: # recv()/read() calls on these client sockets can return 0 even # when we know EOF is not reached. # # The ApplicationPool infrastructure used to connect to a backend # process's Unix socket in the helper server process, and then # pass the connection file descriptor to the web server, which # triggers this kernel bug. We used to work around this by using # TCP sockets instead of Unix sockets; TCP sockets can still fail # with this fake-EOF bug once in a while, but not nearly as often # as with Unix sockets. # # This problem no longer applies today. The web server now passes # all I/O through the HelperAgent, and the bug is no longer # triggered. Nevertheless, we keep this function intact so that # if something like this ever happens again, we know why, and we # can easily reactivate the workaround. Or maybe if we just need # TCP sockets for some other reason. #return RUBY_PLATFORM !~ /darwin/ ruby_engine = defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby" # Unix domain socket implementation on JRuby # is still bugged as of version 1.7.0. They can # cause unexplicable freezes when used in combination # with threading. return ruby_engine != "jruby" end
# File lib/phusion_passenger/request_handler.rb, line 392 def start_threads common_options = { :app => @app, :app_group_name => @app_group_name, :connect_password => @connect_password, :analytics_logger => @analytics_logger } main_socket_options = common_options.merge( :server_socket => @main_socket, :socket_name => "main socket", :protocol => :session ) http_socket_options = common_options.merge( :server_socket => @http_socket, :socket_name => "HTTP socket", :protocol => :http ) # Used for marking threads that have finished initializing, # or failed during initialization. Threads that are not yet done # are not in `initialization_state`. Threads that have succeeded # set their own state to true. Threads that have failed set their # own state to false. initialization_state_mutex = Mutex.new initialization_state_cond = ConditionVariable.new initialization_state = {} set_initialization_state = lambda do |value| initialization_state_mutex.synchronize do initialization_state[Thread.current] = value initialization_state_cond.signal end end set_initialization_state_to_true = lambda do set_initialization_state.call(true) end # Actually start all the threads. thread_handler = @thread_handler expected_nthreads = 0 @threads_mutex.synchronize do @concurrency.times do |i| thread = Thread.new(i) do |number| Thread.current.abort_on_exception = true begin Thread.current[:name] = "Worker #{number + 1}" handler = thread_handler.new(self, main_socket_options) handler.install handler.main_loop(set_initialization_state_to_true) ensure set_initialization_state.call(false) unregister_current_thread end end @threads << thread expected_nthreads += 1 end thread = Thread.new do Thread.current.abort_on_exception = true begin Thread.current[:name] = "HTTP helper worker" handler = thread_handler.new(self, http_socket_options) handler.install handler.main_loop(set_initialization_state_to_true) ensure set_initialization_state.call(false) unregister_current_thread end end @threads << thread expected_nthreads += 1 end # Wait until all threads have finished starting. initialization_state_mutex.synchronize do while initialization_state.size != expected_nthreads initialization_state_cond.wait(initialization_state_mutex) end end end
# File lib/phusion_passenger/request_handler.rb, line 560 def terminate_threads debug("Stopping all threads") threads = @threads_mutex.synchronize do @threads.dup end threads.each do |thr| thr.raise(ThreadHandler::Interrupted.new) end threads.each do |thr| thr.join end debug("All threads stopped") end
# File lib/phusion_passenger/request_handler.rb, line 474 def unregister_current_thread @threads_mutex.synchronize do @threads.delete(Thread.current) end end
# File lib/phusion_passenger/request_handler.rb, line 574 def wait_until_all_threads_are_idle debug("Waiting until all threads have become idle...") # We wait until 100 ms have passed since all handlers have become # interruptable and remained in the same iterations. done = false while !done handlers = @threads_mutex.synchronize do @threads.map do |thr| thr[:passenger_thread_handler] end end debug("There are currently #{handlers.size} threads") if handlers.empty? # There are no threads, so we're done. done = true break end # Record initial state. handlers.each { |h| h.stats_mutex.lock } iterations = handlers.map { |h| h.iteration } handlers.each { |h| h.stats_mutex.unlock } start_time = Time.now sleep 0.01 while true if handlers.size != @threads_mutex.synchronize { @threads.size } debug("The number of threads changed. Restarting waiting algorithm") break end # Record current state. handlers.each { |h| h.stats_mutex.lock } all_interruptable = handlers.all? { |h| h.interruptable } new_iterations = handlers.map { |h| h.iteration } # Are all threads interruptable and has there been no activity # since last time we checked? if all_interruptable && new_iterations == iterations # Yes. If enough time has passed then we're done. handlers.each { |h| h.stats_mutex.unlock } if Time.now >= start_time + 0.1 done = true break end else # No. We reset the timer and check again later. handlers.each { |h| h.stats_mutex.unlock } iterations = new_iterations start_time = Time.now sleep 0.01 end end end debug("All threads are now idle") end
# File lib/phusion_passenger/request_handler.rb, line 480 def wait_until_termination_requested ruby_engine = defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby" if ruby_engine == "jruby" # On JRuby, selecting on an input TTY always returns, so # we use threads to do the job. owner_pipe_watcher = IO.pipe owner_pipe_watcher_thread = Thread.new do Thread.current.abort_on_exception = true Thread.current[:name] = "Owner pipe waiter" begin @owner_pipe.read(1) ensure owner_pipe_watcher[1].write('x') end end begin ios = select([owner_pipe_watcher[0], @graceful_termination_pipe[0]])[0] if ios.include?(owner_pipe_watcher[0]) trace(2, "Owner pipe closed") else trace(2, "Graceful termination pipe closed") end ensure owner_pipe_watcher_thread.kill owner_pipe_watcher_thread.join owner_pipe_watcher[0].close if !owner_pipe_watcher[0].closed? owner_pipe_watcher[1].close if !owner_pipe_watcher[1].closed? end else ios = select([@owner_pipe, @graceful_termination_pipe[0]])[0] if ios.include?(@owner_pipe) trace(2, "Owner pipe closed") else trace(2, "Graceful termination pipe closed") end end end
# File lib/phusion_passenger/request_handler.rb, line 518 def wakeup_all_threads threads = [] if get_socket_address_type(@server_sockets[:main][:address]) == :unix && !File.exist?(@server_sockets[:main][:address].sub(/^unix:/, '')) # It looks like someone deleted the Unix domain socket we listen on. # This makes it impossible to wake up the worker threads gracefully, # so we hard kill them. warn("Unix domain socket gone; force aborting all threads") @threads_mutex.synchronize do @threads.each do |thread| thread.raise(RuntimeError.new("Force abort")) end end else @concurrency.times do Thread.abort_on_exception = true threads << Thread.new(@server_sockets[:main][:address]) do |address| begin debug("Shutting down worker thread by connecting to #{address}") connect_to_server(address).close rescue Errno::ECONNREFUSED debug("Worker thread listening on #{address} already exited") rescue SystemCallError, IOError => e debug("Error shutting down worker thread (#{address}): #{e} (#{e.class})") end end end end threads << Thread.new(@server_sockets[:http][:address]) do |address| Thread.abort_on_exception = true begin debug("Shutting down HTTP thread by connecting to #{address}") connect_to_server(address).close rescue Errno::ECONNREFUSED debug("Worker thread listening on #{address} already exited") rescue SystemCallError, IOError => e debug("Error shutting down HTTP thread (#{address}): #{e} (#{e.class})") end end return threads end