class Procrastinator::QueueWorker
A QueueWorker
checks for tasks to run from the loader defined in the provided config and executes them, updating information in the task loader as necessary.
@author Robin Miller
Constants
- PERSISTER_METHODS
expected methods for all persistence strategies
Public Class Methods
new(queue:, config:, scheduler: nil)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 16 def initialize(queue:, config:, scheduler: nil) @queue = queue @config = config @scheduler = scheduler @logger = nil end
Public Instance Methods
act()
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 40 def act persister = @config.loader tasks = fetch_tasks(persister) tasks.each do |metadata| tw = build_worker(metadata) tw.work if tw.successful? persister.delete(metadata.id) else persister.update(metadata.id, tw.to_h.merge(queue: @queue.name.to_s)) end end end
long_name()
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 58 def long_name name = "#{ @queue.name }-queue-worker" name = "#{ @config.prefix }-#{ name }" if @config.prefix name end
start_log()
click to toggle source
Starts a log file and stores the logger within this queue worker.
Separate from init because logging is context-dependent
# File lib/procrastinator/queue_worker.rb, line 69 def start_log return if @logger || !@config.log_dir @logger = Logger.new(log_target, level: @config.log_level) msg = <<~MSG ====================================================================== Started worker process, #{ long_name }, to work off queue #{ @queue.name }. Worker pid=#{ Process.pid }; parent pid=#{ Process.ppid }. ====================================================================== MSG @logger.info("\n#{ msg }") end
work()
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 24 def work start_log begin loop do sleep(@queue.update_period) act end rescue StandardError => e raise if @config.test_mode? || !@logger @logger.fatal(e) end end
Private Instance Methods
build_worker(metadata)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 86 def build_worker(metadata) start_log TaskWorker.new(metadata: metadata, queue: @queue, scheduler: @scheduler, context: @config.context, logger: @logger) end
fetch_tasks(persister)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 113 def fetch_tasks(persister) tasks = persister.read(queue: @queue.name).map(&:to_h).reject { |t| t[:run_at].nil? } tasks = sort_tasks(tasks) metas = tasks.collect do |t| TaskMetaData.new(t.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }) end metas.select(&:runnable?) end
log_target()
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 96 def log_target return $stdout if @config.test_mode? log_path = @config.log_dir + "#{ long_name }.log" write_log_file(log_path) log_path.to_path end
sort_tasks(tasks)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 125 def sort_tasks(tasks) # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted # results # Ideally, we'd use a better algo than qsort for this, but this will do for now tasks.shuffle.sort_by { |t| t[:run_at] }.first(@queue.max_tasks) end
write_log_file(log_path)
click to toggle source
# File lib/procrastinator/queue_worker.rb, line 106 def write_log_file(log_path) @config.log_dir.mkpath File.open(log_path.to_path, 'a+') do |f| f.write '' end end