class ParallelServer::Prefork
Constants
- DEFAULT_MAX_IDLE
- DEFAULT_MAX_PROCESSES
- DEFAULT_MAX_THREADS
- DEFAULT_MAX_USE
- DEFAULT_MIN_PROCESSES
- DEFAULT_STANDBY_THREADS
- DEFAULT_WATCHDOG_SIGNAL
- DEFAULT_WATCHDOG_TIMER
Attributes
Public Class Methods
@overload initialize(host=nil, port, opts={})
@param host [String] hostname or IP address @param port [Integer / String] port number / service name @macro args
@overload initialize(socket, opts={})
@param socket [Socket] listening socket @macro args
@overload initialize(sockets, opts={})
@param sockets [Array<Socket>] listening sockets @macro args
# File lib/parallel_server/prefork.rb, line 44 def initialize(*args) @sockets, @host, @port, @opts = parse_args(*args) set_variables_from_opts @from_child = {} # IO(r) => pid @to_child = {} # IO(r) => IO(w) @child_status = {} # IO(r) => Hash @children = [] # pid @loop = true @sockets_created = false end
Public Instance Methods
@return [void]
# File lib/parallel_server/prefork.rb, line 107 def detach_children t = Time.now + 5 talk_to_children(detach: true) while Time.now < t && @child_status.values.any?{|s| s[:status] == :run} watch_children end end
@overload reload(host=nil, port, opts={})
@param host [String] hostname or IP address @param port [Integer / String] port number / service name @macro args
@overload reload(socket, opts={})
@param socket [Socket] listening socket @macro args
@overload reload(sockets, opts={})
@param sockets [Array<Socket>] listening sockets @macro args
@return [void]
# File lib/parallel_server/prefork.rb, line 91 def reload(*args) @reload_args = parse_args(*args) end
@return [void] @yield [sock, addr, child] @yieldparam sock [Socket] @yieldparam addr [Addrinfo] @yieldparam child [ParallelServer::Prefork::Child]
# File lib/parallel_server/prefork.rb, line 60 def start(&block) raise 'block required' unless block @block = block unless @sockets @sockets = create_server_socket(@host, @port, @listen_backlog) @sockets_created = true end @reload_args = nil while @loop do_reload if @reload_args watch_children adjust_children end ensure @sockets.each{|s| s.close rescue nil} if @sockets && @sockets_created @to_child.values.each{|s| s.close rescue nil} @to_child.clear Timeout.timeout(1){wait_all_children} rescue Thread.new{wait_all_children} end
@return [void]
# File lib/parallel_server/prefork.rb, line 96 def stop @loop = false end
@return [void]
# File lib/parallel_server/prefork.rb, line 101 def stop! Process.kill 'TERM', *@children rescue nil @loop = false end
Private Instance Methods
@return [void]
# File lib/parallel_server/prefork.rb, line 251 def adjust_children (@min_processes - available_children).times do start_child end capa, conn = current_capacity_and_connections required_connections = conn + @standby_threads required_processes = (required_connections - capa + @max_threads - 1) / @max_threads [required_processes, @max_processes - available_children].min.times do start_child end end
@return [Integer]
# File lib/parallel_server/prefork.rb, line 273 def available_children @child_status.values.count{|st| st[:status] == :run} end
@param host [String] hostname or IP address @param port [Integer / String] port number / service name @param backlog [Integer / nil] listen backlog @return [Array<Socket>] listening sockets
# File lib/parallel_server/prefork.rb, line 145 def create_server_socket(host, port, backlog) t = Time.now + 5 begin sockets = Socket.tcp_server_sockets(host, port) rescue Errno::EADDRINUSE raise if Time.now > t sleep 0.1 retry end sockets.each{|s| s.listen(backlog)} if backlog sockets end
current capacity and current connections @return [Array<Integer, Integer>]
# File lib/parallel_server/prefork.rb, line 265 def current_capacity_and_connections values = @child_status.values capa = values.count{|st| st[:status] == :run} * @max_threads conn = values.map{|st| st[:connections].count}.reduce(&:+).to_i return [capa, conn] end
@return [void]
# File lib/parallel_server/prefork.rb, line 118 def do_reload sockets, host, port, @opts = @reload_args @reload_args = nil old_listen_backlog = @listen_backlog set_variables_from_opts if @sockets_created ? (@host != host || @port != port) : @sockets != sockets @sockets.each{|s| s.close rescue nil} if @sockets_created detach_children @sockets, @host, @port = sockets, host, port if @sockets @sockets_created = false else @sockets = create_server_socket(@host, @port, @listen_backlog) @sockets_created = true end elsif @listen_backlog != old_listen_backlog @sockets.each{|s| s.listen(@listen_backlog)} if @listen_backlog && @sockets_created end reload_children end
@param values [Array] @param timeout [Numeric] @yield [obj] @yieldparam obj [Object] one of values
# File lib/parallel_server/prefork.rb, line 178 def each_nonblock(values, timeout) values = values.dup until values.empty? thr = Thread.new do until values.empty? || Thread.current[:exit] value = values.shift break unless value yield value end end thr.join(timeout) thr[:exit] = true end end
@return [void]
# File lib/parallel_server/prefork.rb, line 278 def kill_frozen_children now = Time.now @child_status.each do |r, st| if now > st[:time] + @watchdog_timer + 60 Process.kill 'KILL', @from_child[r] rescue nil elsif now > st[:time] + @watchdog_timer && ! st[:signal_sent] Process.kill @watchdog_signal, @from_child[r] rescue nil st[:signal_sent] = true end end end
@overload parse_args
(host=nil, port, opts={})
@param host [String] hostname or IP address @param port [Integer / String] port number / service name @macro args
@overload parse_args
(socket, opts={})
@param socket [Socket] listening socket @macro args
@overload parse_args
(sockets, opts={})
@param sockets [Array<Socket>] listening sockets @macro args
@return [Array<Array<Socket>, String, String, Hash>] sockets, hostname, port, option.
either sockets or (hostname & port) is available.
# File lib/parallel_server/prefork.rb, line 205 def parse_args(*args) opts = {} arg_count = args.size if args.last.is_a? Hash opts = args.pop end if args.size == 1 case args.first when Integer, String host, port = nil, args.first else sockets = [args.first].flatten end elsif args.size == 2 host, port = args else raise ArgumentError, "wrong number of arguments (#{arg_count} for 1..3)" end return sockets, host, port, opts end
@return [void]
# File lib/parallel_server/prefork.rb, line 159 def reload_children data = {} data[:options] = @opts.select{|_, value| Marshal.dump(value) rescue nil} talk_to_children data end
# File lib/parallel_server/prefork.rb, line 329 def set_variables_from_opts @min_processes = @opts[:min_processes] || DEFAULT_MIN_PROCESSES @max_processes = @opts[:max_processes] || DEFAULT_MAX_PROCESSES @max_threads = @opts[:max_threads] || DEFAULT_MAX_THREADS @standby_threads = @opts[:standby_threads] || DEFAULT_STANDBY_THREADS @listen_backlog = @opts[:listen_backlog] @watchdog_timer = @opts[:watchdog_timer] || DEFAULT_WATCHDOG_TIMER @watchdog_signal = @opts[:watchdog_signal] || DEFAULT_WATCHDOG_SIGNAL @on_start = @opts[:on_start] @on_child_start = @opts[:on_child_start] @on_child_exit = @opts[:on_child_exit] end
@return [void]
# File lib/parallel_server/prefork.rb, line 307 def start_child from_child = IO.pipe to_child = IO.pipe pid = fork do @from_child.keys.each{|p| p.close rescue nil} @to_child.values.each{|p| p.close rescue nil} from_child[0].close to_child[1].close @on_start.call if @on_start Child.new(@sockets, @opts, from_child[1], to_child[0]).start(@block) exit! true end from_child[1].close to_child[0].close r, w = from_child[0], to_child[1] @from_child[r] = pid @to_child[r] = w @child_status[r] = {status: :run, connections: {}, time: Time.now} @children.push pid @on_child_start.call(pid) if @on_child_start end
@param data [String] @return [void]
# File lib/parallel_server/prefork.rb, line 167 def talk_to_children(data) data_to_child = Marshal.dump(data) each_nonblock(@to_child.values, 1) do |io| Conversation._send(io, data_to_child) rescue nil end end
@return [void]
# File lib/parallel_server/prefork.rb, line 300 def wait_all_children until @children.empty? watch_children end end
@return [void]
# File lib/parallel_server/prefork.rb, line 291 def wait_children @children.delete_if do |pid| _pid, status = Process.waitpid2(pid, Process::WNOHANG) @on_child_exit.call(pid, status) if _pid && @on_child_exit _pid end end
@return [Integer]
# File lib/parallel_server/prefork.rb, line 227 def watch_children rset = @from_child.empty? ? nil : @from_child.keys readable, = IO.select(rset, nil, nil, 0.1) if readable readable.each do |from_child| if st = Conversation.recv(from_child) st[:time] = Time.now @child_status[from_child].update st else @from_child.delete from_child @to_child[from_child].close rescue nil @to_child.delete from_child @child_status.delete from_child from_child.close end end end kill_frozen_children if @children.size != @child_status.size wait_children end end