class Qless::Workers::ForkingWorker

Attributes

max_startup_interval[RW]

The child startup interval

Public Class Methods

new(reserver, options = {}) click to toggle source
Calls superclass method Qless::Workers::BaseWorker::new
# File lib/qless/worker/forking.rb, line 15
def initialize(reserver, options = {})
  super(reserver, options)
  # The keys are the child PIDs, the values are information about the
  # worker, including its sandbox directory. This directory currently
  # isn't used, but this sets up for having that eventually.
  @sandboxes = {}

  # Save our options for starting children
  @options = options

  # The max interval between when children start (reduces thundering herd)
  @max_startup_interval = options[:max_startup_interval] || 10.0

  # TODO: facter to figure out how many cores we have
  @num_workers = options[:num_workers] || 1

  # All the modules that have been applied to this worker
  @modules = []

  @sandbox_mutex = Mutex.new
  # A queue of blocks that are postponed since we cannot get
  # @sandbox_mutex in trap handler
  @postponed_actions_queue = ::Queue.new
end

Public Instance Methods

children() click to toggle source

Returns a list of each of the child pids

# File lib/qless/worker/forking.rb, line 139
def children
  @sandboxes.keys
end
contention_aware_handler(&block) click to toggle source

If @sandbox_mutex is free, execute block immediately. Otherwise, postpone it until handling is possible

# File lib/qless/worker/forking.rb, line 60
def contention_aware_handler(&block)
  if @sandbox_mutex.try_lock
    block.call
    @sandbox_mutex.unlock
  else
    @postponed_actions_queue << block
  end
end
extend(mod) click to toggle source

Because we spawn a new worker, we need to apply all the modules that extend this one

Calls superclass method
# File lib/qless/worker/forking.rb, line 42
def extend(mod)
  @modules << mod
  super(mod)
end
process_postponed_actions() click to toggle source

Process any signals (such as TERM) that could not be processed immediately due to @sandbox_mutex being in use

# File lib/qless/worker/forking.rb, line 71
def process_postponed_actions
  until @postponed_actions_queue.empty?
    # It's possible a signal interrupteed us between the empty?
    # and shift calls, but it could have only added more things
    # into @postponed_actions_queue
    block = @postponed_actions_queue.shift(true)
    @sandbox_mutex.synchronize do
      block.call
    end
  end
end
register_signal_handlers() click to toggle source

Register our handling of signals

# File lib/qless/worker/forking.rb, line 84
def register_signal_handlers
  # If we're the parent process, we mostly want to forward the signals on
  # to the child processes. It's just that sometimes we want to wait for
  # them and then exit
  trap('TERM') do
    contention_aware_handler { stop!('TERM', in_signal_handler=true); exit }
  end
  trap('INT') do
    contention_aware_handler { stop!('INT', in_signal_handler=true); exit }
  end
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') do
    contention_aware_handler { stop!('QUIT', in_signal_handler=true); exit }
  end
  safe_trap('USR1') do
    contention_aware_handler { stop!('KILL', in_signal_handler=true) }
  end
  begin
    trap('CONT') { stop('CONT', in_signal_handler=true) }
    trap('USR2') { stop('USR2', in_signal_handler=true) }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end
run() click to toggle source

Run this worker

# File lib/qless/worker/forking.rb, line 110
def run
  startup_sandboxes

  # Now keep an eye on our child processes, spawn replacements as needed
  loop do
    begin
      # Don't wait on any processes if we're already in shutdown mode.
      break if @shutdown

      # Wait for any child to kick the bucket
      pid, status = Process.wait2
      code, sig = status.exitstatus, status.stopsig
      log((code == 0 ? :info : :warn),
        "Worker process #{pid} died with #{code} from signal (#{sig})")

      # allow our shutdown logic (called from a separate thread) to take affect.
      break if @shutdown

      spawn_replacement_child(pid)
      process_postponed_actions
    rescue SystemCallError => e
      log(:error, "Failed to wait for child process: #{e.inspect}")
      # If we're shutting down, the loop above will exit
      exit! unless @shutdown
    end
  end
end
spawn() click to toggle source

Spawn a new child worker

# File lib/qless/worker/forking.rb, line 48
def spawn
  worker = SerialWorker.new(reserver, @options)
  # We use 11 as the exit status so that it is something unique
  # (rather than the common 1). Plus, 11 looks a little like
  # ll (i.e. "Lock Lost").
  worker.on_current_job_lock_lost { |job| exit!(11) }
  @modules.each { |mod| worker.extend(mod) }
  worker
end
stop(signal = 'QUIT', in_signal_handler=true) click to toggle source

Signal all the children

# File lib/qless/worker/forking.rb, line 144
def stop(signal = 'QUIT', in_signal_handler=true)
  log(:warn, "Sending #{signal} to children") unless in_signal_handler
  children.each do |pid|
    begin
      Process.kill(signal, pid)
    rescue Errno::ESRCH
      # no such process -- means the process has already died.
    end
  end
end
stop!(signal = 'QUIT', in_signal_handler=true) click to toggle source

Signal all the children and wait for them to exit. Should only be called when we have the lock on @sandbox_mutex

# File lib/qless/worker/forking.rb, line 157
def stop!(signal = 'QUIT', in_signal_handler=true)
  shutdown(in_signal_handler=in_signal_handler)
  shutdown_sandboxes(signal, in_signal_handler=in_signal_handler)
end

Private Instance Methods

fork_child_process() { || ... } click to toggle source

returns child's pid.

# File lib/qless/worker/forking.rb, line 234
def fork_child_process
  fork do
    yield if block_given?
    reconnect_each_client
    after_fork
    spawn.run
  end
end
shutdown_sandboxes(signal, in_signal_handler=true) click to toggle source

Should only be called when we have a lock on @sandbox_mutex

# File lib/qless/worker/forking.rb, line 187
def shutdown_sandboxes(signal, in_signal_handler=true)
  # First, send the signal
  stop(signal, in_signal_handler=in_signal_handler)

  # Wait for each of our children
  log(:warn, 'Waiting for child processes') unless in_signal_handler

  until @sandboxes.empty?
    begin
      pid, _ = Process.wait2
      log(:warn, "Child #{pid} stopped") unless in_signal_handler
      @sandboxes.delete(pid)
    rescue SystemCallError
      break
    end
  end


  unless in_signal_handler
    log(:warn, 'All children have stopped')

    # If there were any children processes we couldn't wait for, log it
    @sandboxes.keys.each do |cpid|
      log(:warn, "Could not wait for child #{cpid}")
    end
  end

  @sandboxes.clear
end
spawn_replacement_child(pid) click to toggle source
# File lib/qless/worker/forking.rb, line 219
def spawn_replacement_child(pid)
  @sandbox_mutex.synchronize do
    return if @shutdown

    # And give its slot to a new worker process
    slot = @sandboxes.delete(pid)
    cpid = fork_child_process

    # If we're the parent process, ave information about the child
    log(:info, "Spawned worker #{cpid} to replace #{pid}")
    @sandboxes[cpid] = slot
  end
end
startup_sandboxes() click to toggle source
# File lib/qless/worker/forking.rb, line 164
def startup_sandboxes
  # Make sure we respond to signals correctly
  register_signal_handlers

  log(:debug, "Starting to run with #{@num_workers} workers")
  @num_workers.times do |i|
    slot = {
      worker_id: i,
      sandbox: nil
    }

    cpid = fork_child_process do
      # Wait for a bit to calm the thundering herd
      sleep(rand(max_startup_interval)) if max_startup_interval > 0
    end

    # If we're the parent process, save information about the child
    log(:info, "Spawned worker #{cpid}")
    @sandboxes[cpid] = slot
  end
end