class Resque::Pool

Constants

CHUNK_SIZE
DEFAULT_WORKER_INTERVAL
QUEUE_SIGS
SIG_QUEUE_MAX_SIZE
VERSION

Attributes

app_name[RW]
config_loader[RW]
kill_other_pools[RW]
spawn_delay[RW]
term_behavior[RW]
config[R]
config_loader[R]
workers[R]

Public Class Methods

create_configured() click to toggle source
# File lib/resque/pool.rb, line 111
def self.create_configured
  Resque::Pool.new(config_loader)
end
handle_winch=(bool) click to toggle source
# File lib/resque/pool.rb, line 86
def self.handle_winch=(bool)
  @handle_winch = bool
end
handle_winch?() click to toggle source
# File lib/resque/pool.rb, line 83
def self.handle_winch?
  @handle_winch ||= false
end
kill_other_pools!() click to toggle source
# File lib/resque/pool.rb, line 90
def self.kill_other_pools!
  require 'resque/pool/killer'
  Resque::Pool::Killer.run
end
new(config_loader=nil) click to toggle source
# File lib/resque/pool.rb, line 24
def initialize(config_loader=nil)
  init_config(config_loader)
  @workers = Hash.new { |workers, queues| workers[queues] = {} }
  procline "(initialized)"
end
run() click to toggle source
# File lib/resque/pool.rb, line 104
def self.run
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
  create_configured.start.join
end
single_process_group() click to toggle source
# File lib/resque/pool.rb, line 98
def self.single_process_group
  %w[yes y true t 1 okay sure please].include?(
    ENV["RESQUE_SINGLE_PGRP"].to_s.downcase
  )
end
single_process_group=(bool) click to toggle source
# File lib/resque/pool.rb, line 95
def self.single_process_group=(bool)
  ENV["RESQUE_SINGLE_PGRP"] = !!bool ? "YES" : "NO"
end

Public Instance Methods

after_prefork do |worker| ... end → add a hook click to toggle source
after_prefork << hook → add a hook

after_prefork will run in workers before any jobs. Use these hooks e.g. to reload database connections to ensure that they're not shared among workers.

# File lib/resque/pool.rb, line 61
hook :after_prefork
after_prefork do |worker, pid, workers| ... end → add a hook click to toggle source
after_prefork << hook → add a hook

The `after_spawn` hooks will run in the master after spawning a new worker.

# File lib/resque/pool.rb, line 72
hook :after_spawn
all_known_queues() click to toggle source
# File lib/resque/pool.rb, line 378
def all_known_queues
  config.keys | workers.keys
end
all_pids() click to toggle source
# File lib/resque/pool.rb, line 356
def all_pids
  workers.map {|q,workers| workers.keys }.flatten
end
awaken_master() click to toggle source
# File lib/resque/pool.rb, line 167
def awaken_master
  begin
    self_pipe.last.write_nonblock('.') # wakeup master process from select
  rescue Errno::EAGAIN, Errno::EINTR
    # pipe is full, master should wake up anyways
    retry
  end
end
create_worker(queues) click to toggle source
# File lib/resque/pool.rb, line 423
def create_worker(queues)
  queues = queues.to_s.split(',')
  worker = ::Resque::Worker.new(*queues)
  worker.pool_master_pid = Process.pid
  worker.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
  worker.term_child = ENV['TERM_CHILD']
  if worker.respond_to?(:run_at_exit_hooks=)
    # resque doesn't support this until 1.24, but we support 1.22
    worker.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS'] || false
  end
  if ENV['LOGGING'] || ENV['VERBOSE']
    worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
  end
  if ENV['VVERBOSE']
    worker.very_verbose = ENV['VVERBOSE']
  end
  worker
end
delete_worker(pid) click to toggle source

TODO: close any file descriptors connected to worker, if any

# File lib/resque/pool.rb, line 348
def delete_worker(pid)
  worker = nil
  workers.detect do |queues, pid_to_worker|
    worker = pid_to_worker.delete(pid)
  end
  worker
end
environment() click to toggle source
# File lib/resque/pool.rb, line 137
def environment
  if defined? RAILS_ENV
    RAILS_ENV
  elsif defined?(Rails) && Rails.respond_to?(:env)
    Rails.env
  else
    ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['RESQUE_ENV']
  end
end
graceful_worker_shutdown!(signal) click to toggle source
# File lib/resque/pool.rb, line 256
def graceful_worker_shutdown!(signal)
  log "#{signal}: immediate shutdown (graceful worker shutdown)"
  if term_child
    signal_all_workers(:TERM)
  else
    signal_all_workers(:QUIT)
  end
  :break
end
graceful_worker_shutdown_and_wait!(signal) click to toggle source
# File lib/resque/pool.rb, line 245
def graceful_worker_shutdown_and_wait!(signal)
  log "#{signal}: graceful shutdown, waiting for children"
  if term_child
    signal_all_workers(:TERM)
  else
    signal_all_workers(:QUIT)
  end
  reap_all_workers(0) # will hang until all workers are shutdown
  :break
end
handle_sig_queue!() click to toggle source
# File lib/resque/pool.rb, line 197
def handle_sig_queue!
  case signal = sig_queue.shift
  when :USR1, :USR2, :CONT
    log "#{signal}: sending to all workers"
    signal_all_workers(signal)
  when :HUP
    log "HUP: reset configuration and reload logfiles"
    reset_config
    Logging.reopen_logs!
    log "HUP: gracefully shutdown old children (which have old logfiles open)"
    if term_child
      signal_all_workers(:TERM)
    else
      signal_all_workers(:QUIT)
    end
    log "HUP: new children will inherit new logfiles"
    maintain_worker_count
  when :WINCH
    if self.class.handle_winch?
      log "WINCH: gracefully stopping all workers"
      @config = {}
      maintain_worker_count
    end
  when :QUIT
    if term_child
      shutdown_everything_now!(signal)
    else
      graceful_worker_shutdown_and_wait!(signal)
    end
  when :INT
    graceful_worker_shutdown!(signal)
  when :TERM
    case self.class.term_behavior
    when "graceful_worker_shutdown_and_wait"
      graceful_worker_shutdown_and_wait!(signal)
    when "graceful_worker_shutdown"
      graceful_worker_shutdown!(signal)
    else
      shutdown_everything_now!(signal)
    end
  end
end
init_config(loader) click to toggle source

}}} Config: store loader and load config {{{

# File lib/resque/pool.rb, line 118
def init_config(loader)
  case loader
  when String, Hash, nil
    @config_loader = ConfigLoaders::FileOrHashLoader.new(loader)
  else
    @config_loader = loader
  end
  load_config
end
init_self_pipe!() click to toggle source
# File lib/resque/pool.rb, line 156
def init_self_pipe!
  self_pipe.each { |io| io.close rescue nil }
  self_pipe.replace(IO.pipe)
  self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end
init_sig_handlers!() click to toggle source
# File lib/resque/pool.rb, line 162
def init_sig_handlers!
  QUEUE_SIGS.each { |sig| trap_deferred(sig) }
  trap(:CHLD)     { |_| awaken_master }
end
join() click to toggle source
# File lib/resque/pool.rb, line 299
def join
  loop do
    reap_all_workers
    break if handle_sig_queue! == :break
    if sig_queue.empty?
      master_sleep
      load_config
      maintain_worker_count
    end
    procline("managing #{all_pids.inspect}")
  end
  procline("(shutting down)")
  #stop # gracefully shutdown all workers on our way out
  log "manager finished"
  #unlink_pid_safe(pid) if pid
end
load_config() click to toggle source
# File lib/resque/pool.rb, line 128
def load_config
  @config = config_loader.call(environment)
end
maintain_worker_count() click to toggle source

}}} ???: maintain_worker_count, all_known_queues {{{

# File lib/resque/pool.rb, line 370
def maintain_worker_count
  all_known_queues.each do |queues|
    delta = worker_delta_for(queues)
    spawn_missing_workers_for(queues) if delta > 0
    quit_excess_workers_for(queues)   if delta < 0
  end
end
master_sleep() click to toggle source
# File lib/resque/pool.rb, line 316
def master_sleep
  begin
    ready = IO.select([self_pipe.first], nil, nil, 1) or return
    ready.first && ready.first.first or return
    loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
  rescue Errno::EAGAIN, Errno::EINTR
  end
end
pids_for(queues) click to toggle source
# File lib/resque/pool.rb, line 404
def pids_for(queues)
  workers[queues].keys
end
quit_excess_workers_for(queues) click to toggle source
# File lib/resque/pool.rb, line 393
def quit_excess_workers_for(queues)
  delta = -worker_delta_for(queues)
  pids_for(queues)[0...delta].each do |pid|
    Process.kill("QUIT", pid)
  end
end
reap_all_workers(waitpid_flags=Process::WNOHANG) click to toggle source

}}} worker process management {{{

# File lib/resque/pool.rb, line 328
def reap_all_workers(waitpid_flags=Process::WNOHANG)
  @waiting_for_reaper = waitpid_flags == 0
  begin
    loop do
      # -1, wait for any child process
      wpid, status = Process.waitpid2(-1, waitpid_flags)
      break unless wpid

      if worker = delete_worker(wpid)
        log "Reaped resque worker[#{status.pid}] (status: #{status.exitstatus}) queues: #{worker.queues.join(",")}"
      else
        # this died before it could be killed, so it's not going to have any extra info
        log "Tried to reap worker [#{status.pid}], but it had already died. (status: #{status.exitstatus})"
      end
    end
  rescue Errno::ECHILD, QuitNowException
  end
end
report_worker_pool_pids() click to toggle source
# File lib/resque/pool.rb, line 291
def report_worker_pool_pids
  if workers.empty?
    log "Pool is empty"
  else
    log "Pool contains worker PIDs: #{all_pids.inspect}"
  end
end
reset_config() click to toggle source
# File lib/resque/pool.rb, line 132
def reset_config
  config_loader.reset! if config_loader.respond_to?(:reset!)
  load_config
end
reset_sig_handlers!() click to toggle source
# File lib/resque/pool.rb, line 193
def reset_sig_handlers!
  QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
end
self_pipe() click to toggle source

Sig handlers and self pipe management {{{

# File lib/resque/pool.rb, line 151
def self_pipe; @self_pipe ||= [] end
shutdown_everything_now!(signal) click to toggle source
# File lib/resque/pool.rb, line 266
def shutdown_everything_now!(signal)
  log "#{signal}: immediate shutdown (and immediate worker shutdown)"
  if term_child
    signal_all_workers(:QUIT)
  else
    signal_all_workers(:TERM)
  end
  :break
end
sig_queue() click to toggle source
# File lib/resque/pool.rb, line 152
def sig_queue; @sig_queue ||= [] end
signal_all_workers(signal) click to toggle source
# File lib/resque/pool.rb, line 360
def signal_all_workers(signal)
  log "Sending #{signal} to all workers"
  all_pids.each do |pid|
    Process.kill signal, pid
  end
end
spawn_missing_workers_for(queues) click to toggle source

}}} methods that operate on a single grouping of queues {{{ perhaps this means a class is waiting to be extracted

# File lib/resque/pool.rb, line 386
def spawn_missing_workers_for(queues)
  worker_delta_for(queues).times do |nr|
    spawn_worker!(queues)
    sleep Resque::Pool.spawn_delay if Resque::Pool.spawn_delay
  end
end
spawn_worker!(queues) click to toggle source
# File lib/resque/pool.rb, line 408
def spawn_worker!(queues)
  worker = create_worker(queues)
  pid = fork do
    Process.setpgrp unless Resque::Pool.single_process_group
    worker.worker_parent_pid = Process.pid
    log_worker "Starting worker #{worker}"
    call_after_prefork!(worker)
    reset_sig_handlers!
    #self_pipe.each {|io| io.close }
    worker.work(ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL) # interval, will block
  end
  workers[queues][pid] = worker
  call_after_spawn!(worker, pid, workers)
end
start() click to toggle source

}}} start, join, and master sleep {{{

# File lib/resque/pool.rb, line 279
def start
  procline("(starting)")
  init_self_pipe!
  init_sig_handlers!
  maintain_worker_count
  procline("(started)")
  log "started manager"
  report_worker_pool_pids
  self.class.kill_other_pools! if self.class.kill_other_pools
  self
end
term_child() click to toggle source
# File lib/resque/pool.rb, line 153
def term_child; @term_child ||= ENV['TERM_CHILD'] end
trap_deferred(signal) click to toggle source

defer a signal for later processing in join (master process)

# File lib/resque/pool.rb, line 178
def trap_deferred(signal)
  trap(signal) do |sig_nr|
    if @waiting_for_reaper && [:INT, :TERM].include?(signal)
      log "Recieved #{signal}: short circuiting QUIT waitpid"
      raise QuitNowException
    end
    if sig_queue.size < SIG_QUEUE_MAX_SIZE
      sig_queue << signal
      awaken_master
    else
      log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
    end
  end
end
worker_delta_for(queues) click to toggle source
# File lib/resque/pool.rb, line 400
def worker_delta_for(queues)
  config.fetch(queues, 0) - workers.fetch(queues, []).size
end