class ParallelServer::Prefork::Child
Attributes
options[R]
Public Class Methods
new(sockets, opts, to_parent, from_parent)
click to toggle source
@param sockets [Array<Socket>] @param opts [Hash] @param to_parent [IO] @param from_parent [IO]
# File lib/parallel_server/prefork.rb, line 350 def initialize(sockets, opts, to_parent, from_parent) @sockets = sockets @options = opts @to_parent = to_parent @from_parent = from_parent @threads = {} @threads_mutex = Mutex.new @threads_cv = ConditionVariable.new @status = :run end
Public Instance Methods
max_idle()
click to toggle source
@return [Integer]
# File lib/parallel_server/prefork.rb, line 367 def max_idle @options[:max_idle] || DEFAULT_MAX_IDLE end
max_threads()
click to toggle source
@return [Integer]
# File lib/parallel_server/prefork.rb, line 362 def max_threads @options[:max_threads] || DEFAULT_MAX_THREADS end
max_use()
click to toggle source
@return [Integer]
# File lib/parallel_server/prefork.rb, line 372 def max_use @options[:max_use] || DEFAULT_MAX_USE end
start(block)
click to toggle source
@param block [#call] @return [void]
# File lib/parallel_server/prefork.rb, line 378 def start(block) queue = Queue.new accept_thread = Thread.new{ accept_loop(block, queue) } reload_thread = Thread.new{ reload_loop(queue) } # wait that accept_loop or reload_loop end queue.pop accept_thread.exit @sockets.each{|s| s.close rescue nil} @threads_mutex.synchronize do notify_status end wait_all_connections reload_thread.exit end
Private Instance Methods
accept()
click to toggle source
@return [Array<Socket, AddrInfo>] @return [nil]
# File lib/parallel_server/prefork.rb, line 518 def accept while true timeout = max_idle > 0 ? max_idle : nil readable, = IO.select(@sockets, nil, nil, timeout) return nil unless readable r, = readable begin sock, addr = r.accept_nonblock return [sock, addr] rescue IO::WaitReadable next end end end
accept_loop(block, queue)
click to toggle source
@param block [#call] @param queue [Queue] @return [void]
# File lib/parallel_server/prefork.rb, line 400 def accept_loop(block, queue) count = 0 while @status == :run wait_thread sock, addr = accept next if sock.nil? && count == 0 break unless sock thr = Thread.new(sock, addr){|s, a| run(s, a, block)} @threads_mutex.synchronize do @threads[thr] = addr end count += 1 break if max_use > 0 && count >= max_use end rescue => e STDERR.puts e.inspect, e.backtrace.inspect raise e ensure @status = :stop queue.push true end
connected(addr)
click to toggle source
@param addr [Addrinfo] @return [void]
# File lib/parallel_server/prefork.rb, line 474 def connected(addr) @threads_mutex.synchronize do @threads[Thread.current] = addr notify_status end end
disconnect()
click to toggle source
@return [void]
# File lib/parallel_server/prefork.rb, line 482 def disconnect @threads_mutex.synchronize do @threads.delete Thread.current notify_status @threads_cv.signal end end
notify_status()
click to toggle source
@return [void]
# File lib/parallel_server/prefork.rb, line 491 def notify_status connections = Hash[@threads.map{|thr, adr| [thr.object_id, adr]}] status = { status: @status, connections: connections, } Conversation.send(@to_parent, status) rescue Errno::EPIPE # ignore end
reload_loop(queue)
click to toggle source
@param queue [Queue] @return [void]
# File lib/parallel_server/prefork.rb, line 432 def reload_loop(queue) heartbeat_interval = 5 while true time = Time.now if IO.select([@from_parent], nil, nil, heartbeat_interval) heartbeat_interval -= Time.now - time heartbeat_interval = 0 if heartbeat_interval < 0 data = Conversation.recv(@from_parent) break if data.nil? or data[:detach] @options.update data[:options] if data[:options] @options[:on_reload].call @options if @options[:on_reload] @threads_cv.signal else heartbeat_interval = 5 @threads_mutex.synchronize do Conversation.send(@to_parent, {}) end end end @from_parent.close @from_parent = nil rescue => e STDERR.puts e.inspect, e.backtrace.inspect raise e ensure @status = :stop queue.push true end
run(sock, addr, block)
click to toggle source
@param sock [Socket] @param addr [AddrInfo] @param block [#call] @return [void]
# File lib/parallel_server/prefork.rb, line 506 def run(sock, addr, block) connected(addr) block.call(sock, addr, self) rescue Exception => e STDERR.puts e.inspect, e.backtrace.inspect ensure sock.close rescue nil disconnect end
wait_all_connections()
click to toggle source
@return [void]
# File lib/parallel_server/prefork.rb, line 423 def wait_all_connections @threads.keys.each do |thr| thr.join rescue nil end @status = :exit end
wait_thread()
click to toggle source
@return [void]
# File lib/parallel_server/prefork.rb, line 462 def wait_thread @threads_mutex.synchronize do while true @threads.select!{|thr,| thr.alive?} break if @threads.size < max_threads @threads_cv.wait(@threads_mutex) end end end