class ParallelServer::Prefork::Child

Attributes

options[R]

Public Class Methods

new(sockets, opts, to_parent, from_parent) click to toggle source

@param sockets [Array<Socket>] @param opts [Hash] @param to_parent [IO] @param from_parent [IO]

# File lib/parallel_server/prefork.rb, line 350
def initialize(sockets, opts, to_parent, from_parent)
  @sockets = sockets
  @options = opts
  @to_parent = to_parent
  @from_parent = from_parent
  @threads = {}
  @threads_mutex = Mutex.new
  @threads_cv = ConditionVariable.new
  @status = :run
end

Public Instance Methods

max_idle() click to toggle source

@return [Integer]

# File lib/parallel_server/prefork.rb, line 367
def max_idle
  @options[:max_idle] || DEFAULT_MAX_IDLE
end
max_threads() click to toggle source

@return [Integer]

# File lib/parallel_server/prefork.rb, line 362
def max_threads
  @options[:max_threads] || DEFAULT_MAX_THREADS
end
max_use() click to toggle source

@return [Integer]

# File lib/parallel_server/prefork.rb, line 372
def max_use
  @options[:max_use] || DEFAULT_MAX_USE
end
start(block) click to toggle source

@param block [#call] @return [void]

# File lib/parallel_server/prefork.rb, line 378
def start(block)
  queue = Queue.new
  accept_thread = Thread.new{ accept_loop(block, queue) }
  reload_thread = Thread.new{ reload_loop(queue) }

  # wait that accept_loop or reload_loop end
  queue.pop

  accept_thread.exit
  @sockets.each{|s| s.close rescue nil}
  @threads_mutex.synchronize do
    notify_status
  end
  wait_all_connections
  reload_thread.exit
end

Private Instance Methods

accept() click to toggle source

@return [Array<Socket, AddrInfo>] @return [nil]

# File lib/parallel_server/prefork.rb, line 518
def accept
  while true
    timeout = max_idle > 0 ? max_idle : nil
    readable, = IO.select(@sockets, nil, nil, timeout)
    return nil unless readable
    r, = readable
    begin
      sock, addr = r.accept_nonblock
      return [sock, addr]
    rescue IO::WaitReadable
      next
    end
  end
end
accept_loop(block, queue) click to toggle source

@param block [#call] @param queue [Queue] @return [void]

# File lib/parallel_server/prefork.rb, line 400
def accept_loop(block, queue)
  count = 0
  while @status == :run
    wait_thread
    sock, addr = accept
    next if sock.nil? && count == 0
    break unless sock
    thr = Thread.new(sock, addr){|s, a| run(s, a, block)}
    @threads_mutex.synchronize do
      @threads[thr] = addr
    end
    count += 1
    break if max_use > 0 && count >= max_use
  end
rescue => e
  STDERR.puts e.inspect, e.backtrace.inspect
  raise e
ensure
  @status = :stop
  queue.push true
end
connected(addr) click to toggle source

@param addr [Addrinfo] @return [void]

# File lib/parallel_server/prefork.rb, line 474
def connected(addr)
  @threads_mutex.synchronize do
    @threads[Thread.current] = addr
    notify_status
  end
end
disconnect() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 482
def disconnect
  @threads_mutex.synchronize do
    @threads.delete Thread.current
    notify_status
    @threads_cv.signal
  end
end
notify_status() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 491
def notify_status
  connections = Hash[@threads.map{|thr, adr| [thr.object_id, adr]}]
  status = {
    status: @status,
    connections: connections,
  }
  Conversation.send(@to_parent, status)
rescue Errno::EPIPE
  # ignore
end
reload_loop(queue) click to toggle source

@param queue [Queue] @return [void]

# File lib/parallel_server/prefork.rb, line 432
def reload_loop(queue)
  heartbeat_interval = 5
  while true
    time = Time.now
    if IO.select([@from_parent], nil, nil, heartbeat_interval)
      heartbeat_interval -= Time.now - time
      heartbeat_interval = 0 if heartbeat_interval < 0
      data = Conversation.recv(@from_parent)
      break if data.nil? or data[:detach]
      @options.update data[:options] if data[:options]
      @options[:on_reload].call @options if @options[:on_reload]
      @threads_cv.signal
    else
      heartbeat_interval = 5
      @threads_mutex.synchronize do
        Conversation.send(@to_parent, {})
      end
    end
  end
  @from_parent.close
  @from_parent = nil
rescue => e
  STDERR.puts e.inspect, e.backtrace.inspect
  raise e
ensure
  @status = :stop
  queue.push true
end
run(sock, addr, block) click to toggle source

@param sock [Socket] @param addr [AddrInfo] @param block [#call] @return [void]

# File lib/parallel_server/prefork.rb, line 506
def run(sock, addr, block)
  connected(addr)
  block.call(sock, addr, self)
rescue Exception => e
  STDERR.puts e.inspect, e.backtrace.inspect
ensure
  sock.close rescue nil
  disconnect
end
wait_all_connections() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 423
def wait_all_connections
  @threads.keys.each do |thr|
    thr.join rescue nil
  end
  @status = :exit
end
wait_thread() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 462
def wait_thread
  @threads_mutex.synchronize do
    while true
      @threads.select!{|thr,| thr.alive?}
      break if @threads.size < max_threads
      @threads_cv.wait(@threads_mutex)
    end
  end
end