class Procrastinator::QueueManager
Spawns and manages work queue subprocesses.
This is where all of the multi-process logic should be kept to.
@author Robin Miller
@!attribute [r] :workers
@return [Hash] Maps the constructed QueueWorkers to their process ID.
Attributes
workers[R]
Public Class Methods
new(config)
click to toggle source
# File lib/procrastinator/queue_manager.rb, line 15 def initialize(config) @workers = {} @config = config @logger = start_log end
Public Instance Methods
act(*queue_names)
click to toggle source
# File lib/procrastinator/queue_manager.rb, line 71 def act(*queue_names) unless @config.test_mode? raise <<~ERR Procrastinator.act called outside Test Mode. Either use Procrastinator.spawn_workers or call #enable_test_mode in Procrastinator.setup. ERR end workers = @workers.keys if queue_names.empty? workers.each(&:act) else queue_names.each do |name| workers.find { |worker| worker.name == name }.act end end end
spawn_worker(queue, scheduler: nil)
click to toggle source
Produces a new QueueWorker
for the given queue.
If Test Mode is disabled in the config, then it will also fork a new independent process for that worker to work in.
@param queue [Queue] the queue to build a worker for @param scheduler [Scheduler] an optional scheduler instance to pass to the worker
# File lib/procrastinator/queue_manager.rb, line 47 def spawn_worker(queue, scheduler: nil) worker = QueueWorker.new(queue: queue, config: @config, scheduler: scheduler) if @config.test_mode? @workers[worker] = Process.pid else check_for_name(worker.long_name) pid = fork if pid # === PARENT PROCESS === Process.detach(pid) @workers[worker] = pid else deamonize(worker.long_name) worker.work shutdown_worker end end end
spawn_workers()
click to toggle source
Shuts down any remaining old queue workers and spawns a new one for each queue defined in the config
@return [Scheduler] a scheduler object that can be used to interact with the queues
# File lib/procrastinator/queue_manager.rb, line 24 def spawn_workers scheduler = Scheduler.new(@config, self) kill_old_workers if ENV['PROCRASTINATOR_STOP'] @logger.warn('Cannot spawn queue workers because environment variable PROCRASTINATOR_STOP is set') else @config.queues.each do |queue| spawn_worker(queue, scheduler: scheduler) end end scheduler end
Private Instance Methods
start_log()
click to toggle source
# File lib/procrastinator/queue_manager.rb, line 92 def start_log directory = @config.log_dir return unless directory log_path = directory + 'queue-manager.log' directory.mkpath File.open(log_path.to_path, 'a+') { |f| f.write '' } logger = Logger.new(log_path.to_path) logger.level = @config.log_level # @logger.info(['', # '===================================', # "Started worker process, #{long_name}, to work off queue #{@queue.name}.", # "Worker pid=#{Process.pid}; parent pid=#{Process.ppid}.", # '==================================='].join("\n")) logger end