class ParallelServer::Prefork

Constants

DEFAULT_MAX_IDLE
DEFAULT_MAX_PROCESSES
DEFAULT_MAX_THREADS
DEFAULT_MAX_USE
DEFAULT_MIN_PROCESSES
DEFAULT_STANDBY_THREADS
DEFAULT_WATCHDOG_SIGNAL
DEFAULT_WATCHDOG_TIMER

Attributes

child_status[R]

Public Class Methods

new(*args) click to toggle source

@overload initialize(host=nil, port, opts={})

@param host [String] hostname or IP address
@param port [Integer / String] port number / service name
@macro args

@overload initialize(socket, opts={})

@param socket [Socket] listening socket
@macro args

@overload initialize(sockets, opts={})

@param sockets [Array<Socket>] listening sockets
@macro args
# File lib/parallel_server/prefork.rb, line 44
def initialize(*args)
  @sockets, @host, @port, @opts = parse_args(*args)
  set_variables_from_opts
  @from_child = {}             # IO(r) => pid
  @to_child = {}               # IO(r) => IO(w)
  @child_status = {}           # IO(r) => Hash
  @children = []               # pid
  @loop = true
  @sockets_created = false
end

Public Instance Methods

detach_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 107
def detach_children
  t = Time.now + 5
  talk_to_children(detach: true)
  while Time.now < t && @child_status.values.any?{|s| s[:status] == :run}
    watch_children
  end
end
reload(*args) click to toggle source

@overload reload(host=nil, port, opts={})

@param host [String] hostname or IP address
@param port [Integer / String] port number / service name
@macro args

@overload reload(socket, opts={})

@param socket [Socket] listening socket
@macro args

@overload reload(sockets, opts={})

@param sockets [Array<Socket>] listening sockets
@macro args

@return [void]

# File lib/parallel_server/prefork.rb, line 91
def reload(*args)
  @reload_args = parse_args(*args)
end
start(&block) click to toggle source

@return [void] @yield [sock, addr, child] @yieldparam sock [Socket] @yieldparam addr [Addrinfo] @yieldparam child [ParallelServer::Prefork::Child]

# File lib/parallel_server/prefork.rb, line 60
def start(&block)
  raise 'block required' unless block
  @block = block
  unless @sockets
    @sockets = create_server_socket(@host, @port, @listen_backlog)
    @sockets_created = true
  end
  @reload_args = nil
  while @loop
    do_reload if @reload_args
    watch_children
    adjust_children
  end
ensure
  @sockets.each{|s| s.close rescue nil} if @sockets && @sockets_created
  @to_child.values.each{|s| s.close rescue nil}
  @to_child.clear
  Timeout.timeout(1){wait_all_children} rescue Thread.new{wait_all_children}
end
stop() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 96
def stop
  @loop = false
end
stop!() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 101
def stop!
  Process.kill 'TERM', *@children rescue nil
  @loop = false
end

Private Instance Methods

adjust_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 251
def adjust_children
  (@min_processes - available_children).times do
    start_child
  end
  capa, conn = current_capacity_and_connections
  required_connections = conn + @standby_threads
  required_processes = (required_connections - capa + @max_threads - 1) / @max_threads
  [required_processes, @max_processes - available_children].min.times do
    start_child
  end
end
available_children() click to toggle source

@return [Integer]

# File lib/parallel_server/prefork.rb, line 273
def available_children
  @child_status.values.count{|st| st[:status] == :run}
end
create_server_socket(host, port, backlog) click to toggle source

@param host [String] hostname or IP address @param port [Integer / String] port number / service name @param backlog [Integer / nil] listen backlog @return [Array<Socket>] listening sockets

# File lib/parallel_server/prefork.rb, line 145
def create_server_socket(host, port, backlog)
  t = Time.now + 5
  begin
    sockets = Socket.tcp_server_sockets(host, port)
  rescue Errno::EADDRINUSE
    raise if Time.now > t
    sleep 0.1
    retry
  end
  sockets.each{|s| s.listen(backlog)} if backlog
  sockets
end
current_capacity_and_connections() click to toggle source

current capacity and current connections @return [Array<Integer, Integer>]

# File lib/parallel_server/prefork.rb, line 265
def current_capacity_and_connections
  values = @child_status.values
  capa = values.count{|st| st[:status] == :run} * @max_threads
  conn = values.map{|st| st[:connections].count}.reduce(&:+).to_i
  return [capa, conn]
end
do_reload() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 118
def do_reload
  sockets, host, port, @opts = @reload_args
  @reload_args = nil
  old_listen_backlog = @listen_backlog
  set_variables_from_opts

  if @sockets_created ? (@host != host || @port != port) : @sockets != sockets
    @sockets.each{|s| s.close rescue nil} if @sockets_created
    detach_children
    @sockets, @host, @port = sockets, host, port
    if @sockets
      @sockets_created = false
    else
      @sockets = create_server_socket(@host, @port, @listen_backlog)
      @sockets_created = true
    end
  elsif @listen_backlog != old_listen_backlog
    @sockets.each{|s| s.listen(@listen_backlog)} if @listen_backlog && @sockets_created
  end

  reload_children
end
each_nonblock(values, timeout) { |value| ... } click to toggle source

@param values [Array] @param timeout [Numeric] @yield [obj] @yieldparam obj [Object] one of values

# File lib/parallel_server/prefork.rb, line 178
def each_nonblock(values, timeout)
  values = values.dup
  until values.empty?
    thr = Thread.new do
      until values.empty? || Thread.current[:exit]
        value = values.shift
        break unless value
        yield value
      end
    end
    thr.join(timeout)
    thr[:exit] = true
  end
end
kill_frozen_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 278
def kill_frozen_children
  now = Time.now
  @child_status.each do |r, st|
    if now > st[:time] + @watchdog_timer + 60
      Process.kill 'KILL', @from_child[r] rescue nil
    elsif now > st[:time] + @watchdog_timer && ! st[:signal_sent]
      Process.kill @watchdog_signal, @from_child[r] rescue nil
      st[:signal_sent] = true
    end
  end
end
parse_args(*args) click to toggle source

@overload parse_args(host=nil, port, opts={})

@param host [String] hostname or IP address
@param port [Integer / String] port number / service name
@macro args

@overload parse_args(socket, opts={})

@param socket [Socket] listening socket
@macro args

@overload parse_args(sockets, opts={})

@param sockets [Array<Socket>] listening sockets
@macro args

@return [Array<Array<Socket>, String, String, Hash>] sockets, hostname, port, option.

either sockets or (hostname & port) is available.
# File lib/parallel_server/prefork.rb, line 205
def parse_args(*args)
  opts = {}
  arg_count = args.size
  if args.last.is_a? Hash
    opts = args.pop
  end
  if args.size == 1
    case args.first
    when Integer, String
      host, port = nil, args.first
    else
      sockets = [args.first].flatten
    end
  elsif args.size == 2
    host, port = args
  else
    raise ArgumentError, "wrong number of arguments (#{arg_count} for 1..3)"
  end
  return sockets, host, port, opts
end
reload_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 159
def reload_children
  data = {}
  data[:options] = @opts.select{|_, value| Marshal.dump(value) rescue nil}
  talk_to_children data
end
set_variables_from_opts() click to toggle source
# File lib/parallel_server/prefork.rb, line 329
def set_variables_from_opts
  @min_processes = @opts[:min_processes] || DEFAULT_MIN_PROCESSES
  @max_processes = @opts[:max_processes] || DEFAULT_MAX_PROCESSES
  @max_threads = @opts[:max_threads] || DEFAULT_MAX_THREADS
  @standby_threads = @opts[:standby_threads] || DEFAULT_STANDBY_THREADS
  @listen_backlog = @opts[:listen_backlog]
  @watchdog_timer = @opts[:watchdog_timer] || DEFAULT_WATCHDOG_TIMER
  @watchdog_signal = @opts[:watchdog_signal] || DEFAULT_WATCHDOG_SIGNAL
  @on_start = @opts[:on_start]
  @on_child_start = @opts[:on_child_start]
  @on_child_exit = @opts[:on_child_exit]
end
start_child() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 307
def start_child
  from_child = IO.pipe
  to_child = IO.pipe
  pid = fork do
    @from_child.keys.each{|p| p.close rescue nil}
    @to_child.values.each{|p| p.close rescue nil}
    from_child[0].close
    to_child[1].close
    @on_start.call if @on_start
    Child.new(@sockets, @opts, from_child[1], to_child[0]).start(@block)
    exit! true
  end
  from_child[1].close
  to_child[0].close
  r, w = from_child[0], to_child[1]
  @from_child[r] = pid
  @to_child[r] = w
  @child_status[r] = {status: :run, connections: {}, time: Time.now}
  @children.push pid
  @on_child_start.call(pid) if @on_child_start
end
talk_to_children(data) click to toggle source

@param data [String] @return [void]

# File lib/parallel_server/prefork.rb, line 167
def talk_to_children(data)
  data_to_child = Marshal.dump(data)
  each_nonblock(@to_child.values, 1) do |io|
    Conversation._send(io, data_to_child) rescue nil
  end
end
wait_all_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 300
def wait_all_children
  until @children.empty?
    watch_children
  end
end
wait_children() click to toggle source

@return [void]

# File lib/parallel_server/prefork.rb, line 291
def wait_children
  @children.delete_if do |pid|
    _pid, status = Process.waitpid2(pid, Process::WNOHANG)
    @on_child_exit.call(pid, status) if _pid && @on_child_exit
    _pid
  end
end
watch_children() click to toggle source

@return [Integer]

# File lib/parallel_server/prefork.rb, line 227
def watch_children
  rset = @from_child.empty? ? nil : @from_child.keys
  readable, = IO.select(rset, nil, nil, 0.1)
  if readable
    readable.each do |from_child|
      if st = Conversation.recv(from_child)
        st[:time] = Time.now
        @child_status[from_child].update st
      else
        @from_child.delete from_child
        @to_child[from_child].close rescue nil
        @to_child.delete from_child
        @child_status.delete from_child
        from_child.close
      end
    end
  end
  kill_frozen_children
  if @children.size != @child_status.size
    wait_children
  end
end