class Qless::Workers::ForkingWorker
Attributes
max_startup_interval[RW]
The child startup interval
Public Class Methods
new(reserver, options = {})
click to toggle source
Calls superclass method
Qless::Workers::BaseWorker::new
# File lib/qless/worker/forking.rb, line 15 def initialize(reserver, options = {}) super(reserver, options) # The keys are the child PIDs, the values are information about the # worker, including its sandbox directory. This directory currently # isn't used, but this sets up for having that eventually. @sandboxes = {} # Save our options for starting children @options = options # The max interval between when children start (reduces thundering herd) @max_startup_interval = options[:max_startup_interval] || 10.0 # TODO: facter to figure out how many cores we have @num_workers = options[:num_workers] || 1 # All the modules that have been applied to this worker @modules = [] @sandbox_mutex = Mutex.new # A queue of blocks that are postponed since we cannot get # @sandbox_mutex in trap handler @postponed_actions_queue = ::Queue.new end
Public Instance Methods
children()
click to toggle source
Returns a list of each of the child pids
# File lib/qless/worker/forking.rb, line 139 def children @sandboxes.keys end
contention_aware_handler(&block)
click to toggle source
If @sandbox_mutex is free, execute block immediately. Otherwise, postpone it until handling is possible
# File lib/qless/worker/forking.rb, line 60 def contention_aware_handler(&block) if @sandbox_mutex.try_lock block.call @sandbox_mutex.unlock else @postponed_actions_queue << block end end
extend(mod)
click to toggle source
Because we spawn a new worker, we need to apply all the modules that extend this one
Calls superclass method
# File lib/qless/worker/forking.rb, line 42 def extend(mod) @modules << mod super(mod) end
process_postponed_actions()
click to toggle source
Process any signals (such as TERM) that could not be processed immediately due to @sandbox_mutex being in use
# File lib/qless/worker/forking.rb, line 71 def process_postponed_actions until @postponed_actions_queue.empty? # It's possible a signal interrupteed us between the empty? # and shift calls, but it could have only added more things # into @postponed_actions_queue block = @postponed_actions_queue.shift(true) @sandbox_mutex.synchronize do block.call end end end
register_signal_handlers()
click to toggle source
Register our handling of signals
# File lib/qless/worker/forking.rb, line 84 def register_signal_handlers # If we're the parent process, we mostly want to forward the signals on # to the child processes. It's just that sometimes we want to wait for # them and then exit trap('TERM') do contention_aware_handler { stop!('TERM', in_signal_handler=true); exit } end trap('INT') do contention_aware_handler { stop!('INT', in_signal_handler=true); exit } end safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') do contention_aware_handler { stop!('QUIT', in_signal_handler=true); exit } end safe_trap('USR1') do contention_aware_handler { stop!('KILL', in_signal_handler=true) } end begin trap('CONT') { stop('CONT', in_signal_handler=true) } trap('USR2') { stop('USR2', in_signal_handler=true) } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end
run()
click to toggle source
Run this worker
# File lib/qless/worker/forking.rb, line 110 def run startup_sandboxes # Now keep an eye on our child processes, spawn replacements as needed loop do begin # Don't wait on any processes if we're already in shutdown mode. break if @shutdown # Wait for any child to kick the bucket pid, status = Process.wait2 code, sig = status.exitstatus, status.stopsig log((code == 0 ? :info : :warn), "Worker process #{pid} died with #{code} from signal (#{sig})") # allow our shutdown logic (called from a separate thread) to take affect. break if @shutdown spawn_replacement_child(pid) process_postponed_actions rescue SystemCallError => e log(:error, "Failed to wait for child process: #{e.inspect}") # If we're shutting down, the loop above will exit exit! unless @shutdown end end end
spawn()
click to toggle source
Spawn a new child worker
# File lib/qless/worker/forking.rb, line 48 def spawn worker = SerialWorker.new(reserver, @options) # We use 11 as the exit status so that it is something unique # (rather than the common 1). Plus, 11 looks a little like # ll (i.e. "Lock Lost"). worker.on_current_job_lock_lost { |job| exit!(11) } @modules.each { |mod| worker.extend(mod) } worker end
stop(signal = 'QUIT', in_signal_handler=true)
click to toggle source
Signal all the children
# File lib/qless/worker/forking.rb, line 144 def stop(signal = 'QUIT', in_signal_handler=true) log(:warn, "Sending #{signal} to children") unless in_signal_handler children.each do |pid| begin Process.kill(signal, pid) rescue Errno::ESRCH # no such process -- means the process has already died. end end end
stop!(signal = 'QUIT', in_signal_handler=true)
click to toggle source
Signal all the children and wait for them to exit. Should only be called when we have the lock on @sandbox_mutex
# File lib/qless/worker/forking.rb, line 157 def stop!(signal = 'QUIT', in_signal_handler=true) shutdown(in_signal_handler=in_signal_handler) shutdown_sandboxes(signal, in_signal_handler=in_signal_handler) end
Private Instance Methods
fork_child_process() { || ... }
click to toggle source
returns child's pid.
# File lib/qless/worker/forking.rb, line 234 def fork_child_process fork do yield if block_given? reconnect_each_client after_fork spawn.run end end
shutdown_sandboxes(signal, in_signal_handler=true)
click to toggle source
Should only be called when we have a lock on @sandbox_mutex
# File lib/qless/worker/forking.rb, line 187 def shutdown_sandboxes(signal, in_signal_handler=true) # First, send the signal stop(signal, in_signal_handler=in_signal_handler) # Wait for each of our children log(:warn, 'Waiting for child processes') unless in_signal_handler until @sandboxes.empty? begin pid, _ = Process.wait2 log(:warn, "Child #{pid} stopped") unless in_signal_handler @sandboxes.delete(pid) rescue SystemCallError break end end unless in_signal_handler log(:warn, 'All children have stopped') # If there were any children processes we couldn't wait for, log it @sandboxes.keys.each do |cpid| log(:warn, "Could not wait for child #{cpid}") end end @sandboxes.clear end
spawn_replacement_child(pid)
click to toggle source
# File lib/qless/worker/forking.rb, line 219 def spawn_replacement_child(pid) @sandbox_mutex.synchronize do return if @shutdown # And give its slot to a new worker process slot = @sandboxes.delete(pid) cpid = fork_child_process # If we're the parent process, ave information about the child log(:info, "Spawned worker #{cpid} to replace #{pid}") @sandboxes[cpid] = slot end end
startup_sandboxes()
click to toggle source
# File lib/qless/worker/forking.rb, line 164 def startup_sandboxes # Make sure we respond to signals correctly register_signal_handlers log(:debug, "Starting to run with #{@num_workers} workers") @num_workers.times do |i| slot = { worker_id: i, sandbox: nil } cpid = fork_child_process do # Wait for a bit to calm the thundering herd sleep(rand(max_startup_interval)) if max_startup_interval > 0 end # If we're the parent process, save information about the child log(:info, "Spawned worker #{cpid}") @sandboxes[cpid] = slot end end