class Procrastinator::QueueWorker

A QueueWorker checks for tasks to run from the task store and executes them, updating information in the task store as necessary.

@author Robin Miller

Constants

NULL_FILE
PERSISTER_METHODS

expected methods for all persistence strategies

Public Class Methods

new(queue:, config:) click to toggle source
# File lib/procrastinator/queue_worker.rb, line 18
def initialize(queue:, config:)
   raise ArgumentError, ':queue cannot be nil' if queue.nil?
   raise ArgumentError, ':config cannot be nil' if config.nil?

   @config = config

   @queue = if queue.is_a? Symbol
               config.queue(name: queue)
            else
               queue
            end

   @scheduler = Scheduler.new(config)
   @logger    = Logger.new(File::NULL)
end

Public Instance Methods

halt() click to toggle source

Logs halting the queue

# File lib/procrastinator/queue_worker.rb, line 70
def halt
   @logger&.info("Halted worker on queue: #{ name }")
   @logger&.close
end
open_log!(name, config) click to toggle source

Starts a log file and returns the created Logger

# File lib/procrastinator/queue_worker.rb, line 76
def open_log!(name, config)
   if config.log_level
      log_path = config.log_dir / "#{ name }.log"

      config.log_dir.mkpath
      FileUtils.touch(log_path)
   else
      log_path = NULL_FILE
   end

   Logger.new(log_path.to_path,
              config.log_shift_age, config.log_shift_size,
              level:     config.log_level || Logger::FATAL,
              progname:  name,
              formatter: Config::DEFAULT_LOG_FORMATTER)
end
work!() click to toggle source

Works on jobs forever

# File lib/procrastinator/queue_worker.rb, line 35
def work!
   @logger = open_log!("#{ name }-queue-worker", @config)
   @logger.info "Started worker thread to consume queue: #{ name }"

   loop do
      sleep(@queue.update_period)

      work_one
   end
rescue StandardError => e
   @logger.fatal(e)

   raise
end
work_one() click to toggle source

Performs exactly one task on the queue

# File lib/procrastinator/queue_worker.rb, line 51
def work_one
   task = next_task(logger:    @logger,
                    container: @config.container,
                    scheduler: @scheduler) || return

   begin
      task.run

      @queue.delete(task.id)
   rescue StandardError => e
      task.fail(e)

      task_info = task.to_h
      id        = task_info.delete(:id)
      @queue.update(id, **task_info)
   end
end