class Resque::Pool
Constants
- CHUNK_SIZE
- DEFAULT_WORKER_INTERVAL
- QUEUE_SIGS
- SIG_QUEUE_MAX_SIZE
- VERSION
Attributes
app_name[RW]
config_loader[RW]
kill_other_pools[RW]
spawn_delay[RW]
term_behavior[RW]
config[R]
config_loader[R]
workers[R]
Public Class Methods
create_configured()
click to toggle source
# File lib/resque/pool.rb, line 111 def self.create_configured Resque::Pool.new(config_loader) end
handle_winch=(bool)
click to toggle source
# File lib/resque/pool.rb, line 86 def self.handle_winch=(bool) @handle_winch = bool end
handle_winch?()
click to toggle source
# File lib/resque/pool.rb, line 83 def self.handle_winch? @handle_winch ||= false end
kill_other_pools!()
click to toggle source
# File lib/resque/pool.rb, line 90 def self.kill_other_pools! require 'resque/pool/killer' Resque::Pool::Killer.run end
new(config_loader=nil)
click to toggle source
# File lib/resque/pool.rb, line 24 def initialize(config_loader=nil) init_config(config_loader) @workers = Hash.new { |workers, queues| workers[queues] = {} } procline "(initialized)" end
run()
click to toggle source
# File lib/resque/pool.rb, line 104 def self.run if GC.respond_to?(:copy_on_write_friendly=) GC.copy_on_write_friendly = true end create_configured.start.join end
single_process_group()
click to toggle source
# File lib/resque/pool.rb, line 98 def self.single_process_group %w[yes y true t 1 okay sure please].include?( ENV["RESQUE_SINGLE_PGRP"].to_s.downcase ) end
single_process_group=(bool)
click to toggle source
# File lib/resque/pool.rb, line 95 def self.single_process_group=(bool) ENV["RESQUE_SINGLE_PGRP"] = !!bool ? "YES" : "NO" end
Public Instance Methods
after_prefork do |worker| ... end → add a hook
click to toggle source
after_prefork << hook → add a hook
after_prefork
will run in workers before any jobs. Use these hooks e.g. to reload database connections to ensure that they're not shared among workers.
# File lib/resque/pool.rb, line 61 hook :after_prefork
after_prefork do |worker, pid, workers| ... end → add a hook
click to toggle source
after_prefork << hook → add a hook
The `after_spawn` hooks will run in the master after spawning a new worker.
# File lib/resque/pool.rb, line 72 hook :after_spawn
all_known_queues()
click to toggle source
# File lib/resque/pool.rb, line 378 def all_known_queues config.keys | workers.keys end
all_pids()
click to toggle source
# File lib/resque/pool.rb, line 356 def all_pids workers.map {|q,workers| workers.keys }.flatten end
awaken_master()
click to toggle source
# File lib/resque/pool.rb, line 167 def awaken_master begin self_pipe.last.write_nonblock('.') # wakeup master process from select rescue Errno::EAGAIN, Errno::EINTR # pipe is full, master should wake up anyways retry end end
create_worker(queues)
click to toggle source
# File lib/resque/pool.rb, line 423 def create_worker(queues) queues = queues.to_s.split(',') worker = ::Resque::Worker.new(*queues) worker.pool_master_pid = Process.pid worker.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f worker.term_child = ENV['TERM_CHILD'] if worker.respond_to?(:run_at_exit_hooks=) # resque doesn't support this until 1.24, but we support 1.22 worker.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS'] || false end if ENV['LOGGING'] || ENV['VERBOSE'] worker.verbose = ENV['LOGGING'] || ENV['VERBOSE'] end if ENV['VVERBOSE'] worker.very_verbose = ENV['VVERBOSE'] end worker end
delete_worker(pid)
click to toggle source
TODO: close any file descriptors connected to worker, if any
# File lib/resque/pool.rb, line 348 def delete_worker(pid) worker = nil workers.detect do |queues, pid_to_worker| worker = pid_to_worker.delete(pid) end worker end
environment()
click to toggle source
# File lib/resque/pool.rb, line 137 def environment if defined? RAILS_ENV RAILS_ENV elsif defined?(Rails) && Rails.respond_to?(:env) Rails.env else ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['RESQUE_ENV'] end end
graceful_worker_shutdown!(signal)
click to toggle source
# File lib/resque/pool.rb, line 256 def graceful_worker_shutdown!(signal) log "#{signal}: immediate shutdown (graceful worker shutdown)" if term_child signal_all_workers(:TERM) else signal_all_workers(:QUIT) end :break end
graceful_worker_shutdown_and_wait!(signal)
click to toggle source
# File lib/resque/pool.rb, line 245 def graceful_worker_shutdown_and_wait!(signal) log "#{signal}: graceful shutdown, waiting for children" if term_child signal_all_workers(:TERM) else signal_all_workers(:QUIT) end reap_all_workers(0) # will hang until all workers are shutdown :break end
handle_sig_queue!()
click to toggle source
# File lib/resque/pool.rb, line 197 def handle_sig_queue! case signal = sig_queue.shift when :USR1, :USR2, :CONT log "#{signal}: sending to all workers" signal_all_workers(signal) when :HUP log "HUP: reset configuration and reload logfiles" reset_config Logging.reopen_logs! log "HUP: gracefully shutdown old children (which have old logfiles open)" if term_child signal_all_workers(:TERM) else signal_all_workers(:QUIT) end log "HUP: new children will inherit new logfiles" maintain_worker_count when :WINCH if self.class.handle_winch? log "WINCH: gracefully stopping all workers" @config = {} maintain_worker_count end when :QUIT if term_child shutdown_everything_now!(signal) else graceful_worker_shutdown_and_wait!(signal) end when :INT graceful_worker_shutdown!(signal) when :TERM case self.class.term_behavior when "graceful_worker_shutdown_and_wait" graceful_worker_shutdown_and_wait!(signal) when "graceful_worker_shutdown" graceful_worker_shutdown!(signal) else shutdown_everything_now!(signal) end end end
init_config(loader)
click to toggle source
}}} Config: store loader and load config {{{
# File lib/resque/pool.rb, line 118 def init_config(loader) case loader when String, Hash, nil @config_loader = ConfigLoaders::FileOrHashLoader.new(loader) else @config_loader = loader end load_config end
init_self_pipe!()
click to toggle source
# File lib/resque/pool.rb, line 156 def init_self_pipe! self_pipe.each { |io| io.close rescue nil } self_pipe.replace(IO.pipe) self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } end
init_sig_handlers!()
click to toggle source
# File lib/resque/pool.rb, line 162 def init_sig_handlers! QUEUE_SIGS.each { |sig| trap_deferred(sig) } trap(:CHLD) { |_| awaken_master } end
join()
click to toggle source
# File lib/resque/pool.rb, line 299 def join loop do reap_all_workers break if handle_sig_queue! == :break if sig_queue.empty? master_sleep load_config maintain_worker_count end procline("managing #{all_pids.inspect}") end procline("(shutting down)") #stop # gracefully shutdown all workers on our way out log "manager finished" #unlink_pid_safe(pid) if pid end
load_config()
click to toggle source
# File lib/resque/pool.rb, line 128 def load_config @config = config_loader.call(environment) end
maintain_worker_count()
click to toggle source
}}} ???: maintain_worker_count
, all_known_queues
{{{
# File lib/resque/pool.rb, line 370 def maintain_worker_count all_known_queues.each do |queues| delta = worker_delta_for(queues) spawn_missing_workers_for(queues) if delta > 0 quit_excess_workers_for(queues) if delta < 0 end end
master_sleep()
click to toggle source
# File lib/resque/pool.rb, line 316 def master_sleep begin ready = IO.select([self_pipe.first], nil, nil, 1) or return ready.first && ready.first.first or return loop { self_pipe.first.read_nonblock(CHUNK_SIZE) } rescue Errno::EAGAIN, Errno::EINTR end end
pids_for(queues)
click to toggle source
# File lib/resque/pool.rb, line 404 def pids_for(queues) workers[queues].keys end
quit_excess_workers_for(queues)
click to toggle source
# File lib/resque/pool.rb, line 393 def quit_excess_workers_for(queues) delta = -worker_delta_for(queues) pids_for(queues)[0...delta].each do |pid| Process.kill("QUIT", pid) end end
reap_all_workers(waitpid_flags=Process::WNOHANG)
click to toggle source
}}} worker process management {{{
# File lib/resque/pool.rb, line 328 def reap_all_workers(waitpid_flags=Process::WNOHANG) @waiting_for_reaper = waitpid_flags == 0 begin loop do # -1, wait for any child process wpid, status = Process.waitpid2(-1, waitpid_flags) break unless wpid if worker = delete_worker(wpid) log "Reaped resque worker[#{status.pid}] (status: #{status.exitstatus}) queues: #{worker.queues.join(",")}" else # this died before it could be killed, so it's not going to have any extra info log "Tried to reap worker [#{status.pid}], but it had already died. (status: #{status.exitstatus})" end end rescue Errno::ECHILD, QuitNowException end end
report_worker_pool_pids()
click to toggle source
# File lib/resque/pool.rb, line 291 def report_worker_pool_pids if workers.empty? log "Pool is empty" else log "Pool contains worker PIDs: #{all_pids.inspect}" end end
reset_config()
click to toggle source
# File lib/resque/pool.rb, line 132 def reset_config config_loader.reset! if config_loader.respond_to?(:reset!) load_config end
reset_sig_handlers!()
click to toggle source
# File lib/resque/pool.rb, line 193 def reset_sig_handlers! QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") } end
self_pipe()
click to toggle source
Sig handlers and self pipe management {{{
# File lib/resque/pool.rb, line 151 def self_pipe; @self_pipe ||= [] end
shutdown_everything_now!(signal)
click to toggle source
# File lib/resque/pool.rb, line 266 def shutdown_everything_now!(signal) log "#{signal}: immediate shutdown (and immediate worker shutdown)" if term_child signal_all_workers(:QUIT) else signal_all_workers(:TERM) end :break end
sig_queue()
click to toggle source
# File lib/resque/pool.rb, line 152 def sig_queue; @sig_queue ||= [] end
signal_all_workers(signal)
click to toggle source
# File lib/resque/pool.rb, line 360 def signal_all_workers(signal) log "Sending #{signal} to all workers" all_pids.each do |pid| Process.kill signal, pid end end
spawn_missing_workers_for(queues)
click to toggle source
}}} methods that operate on a single grouping of queues {{{ perhaps this means a class is waiting to be extracted
# File lib/resque/pool.rb, line 386 def spawn_missing_workers_for(queues) worker_delta_for(queues).times do |nr| spawn_worker!(queues) sleep Resque::Pool.spawn_delay if Resque::Pool.spawn_delay end end
spawn_worker!(queues)
click to toggle source
# File lib/resque/pool.rb, line 408 def spawn_worker!(queues) worker = create_worker(queues) pid = fork do Process.setpgrp unless Resque::Pool.single_process_group worker.worker_parent_pid = Process.pid log_worker "Starting worker #{worker}" call_after_prefork!(worker) reset_sig_handlers! #self_pipe.each {|io| io.close } worker.work(ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL) # interval, will block end workers[queues][pid] = worker call_after_spawn!(worker, pid, workers) end
start()
click to toggle source
}}} start, join, and master sleep {{{
# File lib/resque/pool.rb, line 279 def start procline("(starting)") init_self_pipe! init_sig_handlers! maintain_worker_count procline("(started)") log "started manager" report_worker_pool_pids self.class.kill_other_pools! if self.class.kill_other_pools self end
term_child()
click to toggle source
# File lib/resque/pool.rb, line 153 def term_child; @term_child ||= ENV['TERM_CHILD'] end
trap_deferred(signal)
click to toggle source
defer a signal for later processing in join
(master process)
# File lib/resque/pool.rb, line 178 def trap_deferred(signal) trap(signal) do |sig_nr| if @waiting_for_reaper && [:INT, :TERM].include?(signal) log "Recieved #{signal}: short circuiting QUIT waitpid" raise QuitNowException end if sig_queue.size < SIG_QUEUE_MAX_SIZE sig_queue << signal awaken_master else log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}" end end end
worker_delta_for(queues)
click to toggle source
# File lib/resque/pool.rb, line 400 def worker_delta_for(queues) config.fetch(queues, 0) - workers.fetch(queues, []).size end