class Procrastinator::QueueWorker

A QueueWorker checks for tasks to run from the loader defined in the provided config and executes them, updating information in the task loader as necessary.

@author Robin Miller

Constants

PERSISTER_METHODS

expected methods for all persistence strategies

Public Class Methods

new(queue:, config:, scheduler: nil) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 16
def initialize(queue:, config:, scheduler: nil)
   @queue     = queue
   @config    = config
   @scheduler = scheduler

   @logger = nil
end

Public Instance Methods

act() click to toggle source
# File lib/procrastinator/queue_worker.rb, line 40
def act
   persister = @config.loader

   tasks = fetch_tasks(persister)

   tasks.each do |metadata|
      tw = build_worker(metadata)

      tw.work

      if tw.successful?
         persister.delete(metadata.id)
      else
         persister.update(metadata.id, tw.to_h.merge(queue: @queue.name.to_s))
      end
   end
end
long_name() click to toggle source
# File lib/procrastinator/queue_worker.rb, line 58
def long_name
   name = "#{ @queue.name }-queue-worker"

   name = "#{ @config.prefix }-#{ name }" if @config.prefix

   name
end
start_log() click to toggle source

Starts a log file and stores the logger within this queue worker.

Separate from init because logging is context-dependent

# File lib/procrastinator/queue_worker.rb, line 69
      def start_log
         return if @logger || !@config.log_dir

         @logger = Logger.new(log_target, level: @config.log_level)

         msg = <<~MSG
            ======================================================================
            Started worker process, #{ long_name }, to work off queue #{ @queue.name }.
            Worker pid=#{ Process.pid }; parent pid=#{ Process.ppid }.
            ======================================================================
         MSG

         @logger.info("\n#{ msg }")
      end
work() click to toggle source
# File lib/procrastinator/queue_worker.rb, line 24
def work
   start_log

   begin
      loop do
         sleep(@queue.update_period)

         act
      end
   rescue StandardError => e
      raise if @config.test_mode? || !@logger

      @logger.fatal(e)
   end
end

Private Instance Methods

build_worker(metadata) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 86
def build_worker(metadata)
   start_log

   TaskWorker.new(metadata:  metadata,
                  queue:     @queue,
                  scheduler: @scheduler,
                  context:   @config.context,
                  logger:    @logger)
end
fetch_tasks(persister) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 113
def fetch_tasks(persister)
   tasks = persister.read(queue: @queue.name).map(&:to_h).reject { |t| t[:run_at].nil? }

   tasks = sort_tasks(tasks)

   metas = tasks.collect do |t|
      TaskMetaData.new(t.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) })
   end

   metas.select(&:runnable?)
end
log_target() click to toggle source
# File lib/procrastinator/queue_worker.rb, line 96
def log_target
   return $stdout if @config.test_mode?

   log_path = @config.log_dir + "#{ long_name }.log"

   write_log_file(log_path)

   log_path.to_path
end
sort_tasks(tasks) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 125
def sort_tasks(tasks)
   # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
   # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
   # results
   # Ideally, we'd use a better algo than qsort for this, but this will do for now
   tasks.shuffle.sort_by { |t| t[:run_at] }.first(@queue.max_tasks)
end
write_log_file(log_path) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 106
def write_log_file(log_path)
   @config.log_dir.mkpath
   File.open(log_path.to_path, 'a+') do |f|
      f.write ''
   end
end