class Procrastinator::QueueWorker
A QueueWorker
checks for tasks to run from the task store and executes them, updating information in the task store as necessary.
@author Robin Miller
Constants
- NULL_FILE
- PERSISTER_METHODS
expected methods for all persistence strategies
Public Class Methods
new(queue:, config:)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 18 def initialize(queue:, config:) raise ArgumentError, ':queue cannot be nil' if queue.nil? raise ArgumentError, ':config cannot be nil' if config.nil? @config = config @queue = if queue.is_a? Symbol config.queue(name: queue) else queue end @scheduler = Scheduler.new(config) @logger = Logger.new(File::NULL) end
Public Instance Methods
halt()
click to toggle source
Logs halting the queue
# File lib/procrastinator/queue_worker.rb, line 70 def halt @logger&.info("Halted worker on queue: #{ name }") @logger&.close end
open_log!(name, config)
click to toggle source
Starts a log file and returns the created Logger
# File lib/procrastinator/queue_worker.rb, line 76 def open_log!(name, config) if config.log_level log_path = config.log_dir / "#{ name }.log" config.log_dir.mkpath FileUtils.touch(log_path) else log_path = NULL_FILE end Logger.new(log_path.to_path, config.log_shift_age, config.log_shift_size, level: config.log_level || Logger::FATAL, progname: name, formatter: Config::DEFAULT_LOG_FORMATTER) end
work!()
click to toggle source
Works on jobs forever
# File lib/procrastinator/queue_worker.rb, line 35 def work! @logger = open_log!("#{ name }-queue-worker", @config) @logger.info "Started worker thread to consume queue: #{ name }" loop do sleep(@queue.update_period) work_one end rescue StandardError => e @logger.fatal(e) raise end
work_one()
click to toggle source
Performs exactly one task on the queue
# File lib/procrastinator/queue_worker.rb, line 51 def work_one task = next_task(logger: @logger, container: @config.container, scheduler: @scheduler) || return begin task.run @queue.delete(task.id) rescue StandardError => e task.fail(e) task_info = task.to_h id = task_info.delete(:id) @queue.update(id, **task_info) end end