class Procrastinator::QueueManager

Spawns and manages work queue subprocesses.

This is where all of the multi-process logic should be kept to.

@author Robin Miller

@!attribute [r] :workers

@return [Hash] Maps the constructed QueueWorkers to their process ID.

Attributes

workers[R]

Public Class Methods

new(config) click to toggle source
# File lib/procrastinator/queue_manager.rb, line 15
def initialize(config)
   @workers = {}
   @config  = config
   @logger  = start_log
end

Public Instance Methods

act(*queue_names) click to toggle source
# File lib/procrastinator/queue_manager.rb, line 71
      def act(*queue_names)
         unless @config.test_mode?
            raise <<~ERR
               Procrastinator.act called outside Test Mode.
               Either use Procrastinator.spawn_workers or call #enable_test_mode in Procrastinator.setup.
            ERR
         end

         workers = @workers.keys

         if queue_names.empty?
            workers.each(&:act)
         else
            queue_names.each do |name|
               workers.find { |worker| worker.name == name }.act
            end
         end
      end
spawn_worker(queue, scheduler: nil) click to toggle source

Produces a new QueueWorker for the given queue.

If Test Mode is disabled in the config, then it will also fork a new independent process for that worker to work in.

@param queue [Queue] the queue to build a worker for @param scheduler [Scheduler] an optional scheduler instance to pass to the worker

# File lib/procrastinator/queue_manager.rb, line 47
def spawn_worker(queue, scheduler: nil)
   worker = QueueWorker.new(queue:     queue,
                            config:    @config,
                            scheduler: scheduler)
   if @config.test_mode?
      @workers[worker] = Process.pid
   else
      check_for_name(worker.long_name)

      pid = fork

      if pid
         # === PARENT PROCESS ===
         Process.detach(pid)
         @workers[worker] = pid
      else
         deamonize(worker.long_name)

         worker.work
         shutdown_worker
      end
   end
end
spawn_workers() click to toggle source

Shuts down any remaining old queue workers and spawns a new one for each queue defined in the config

@return [Scheduler] a scheduler object that can be used to interact with the queues

# File lib/procrastinator/queue_manager.rb, line 24
def spawn_workers
   scheduler = Scheduler.new(@config, self)

   kill_old_workers

   if ENV['PROCRASTINATOR_STOP']
      @logger.warn('Cannot spawn queue workers because environment variable PROCRASTINATOR_STOP is set')
   else
      @config.queues.each do |queue|
         spawn_worker(queue, scheduler: scheduler)
      end
   end

   scheduler
end

Private Instance Methods

start_log() click to toggle source
# File lib/procrastinator/queue_manager.rb, line 92
def start_log
   directory = @config.log_dir

   return unless directory

   log_path = directory + 'queue-manager.log'

   directory.mkpath
   File.open(log_path.to_path, 'a+') { |f| f.write '' }

   logger = Logger.new(log_path.to_path)

   logger.level = @config.log_level

   # @logger.info(['',
   #               '===================================',
   #               "Started worker process, #{long_name}, to work off queue #{@queue.name}.",
   #               "Worker pid=#{Process.pid}; parent pid=#{Process.ppid}.",
   #               '==================================='].join("\n"))

   logger
end