module Procrastinator::Scheduler::ThreadedWorking

Threaded work style

@see WorkProxy

Constants

PROG_NAME

Program name. Used as default for pid file names and in logging.

Public Instance Methods

threaded(timeout: nil) click to toggle source

Work off jobs per queue, each in its own thread.

@param timeout Maximum number of seconds to run for. If nil, will run indefinitely.

# File lib/procrastinator/scheduler.rb, line 135
def threaded(timeout: nil)
   open_log
   shutdown_on_interrupt

   begin
      @threads = spawn_threads

      @logger.info "#{ PROG_NAME } running. Process ID: #{ Process.pid }"
      @threads.each do |thread|
         thread.join(timeout)
      end
   rescue StandardError => e
      thread_crash(e)
   ensure
      @logger&.info 'Halting worker threads...'
      shutdown!
      @logger&.info 'Threads halted.'
   end
end

Private Instance Methods

log_path() click to toggle source
# File lib/procrastinator/scheduler.rb, line 217
def log_path
   path = @config.log_dir / "#{ PROG_NAME.downcase }.log"
   path.dirname.mkpath
   # FileUtils.touch(log_path)
   path
end
open_log(quiet: false) click to toggle source
# File lib/procrastinator/scheduler.rb, line 199
def open_log(quiet: false)
   return if @logger

   log_devs = []

   log_devs << StringIO.new if quiet && !@config.log_level
   log_devs << $stderr unless quiet
   log_devs << log_path.open('a') if @config.log_level

   multi      = MultiIO.new(*log_devs)
   multi.sync = true

   @logger = Logger.new(multi,
                        progname:  PROG_NAME.downcase,
                        level:     @config.log_level || Logger::INFO,
                        formatter: Config::DEFAULT_LOG_FORMATTER)
end
shutdown!() click to toggle source
# File lib/procrastinator/scheduler.rb, line 195
def shutdown!
   (@threads || []).select(&:alive?).each(&:kill)
end
shutdown_on_interrupt() click to toggle source
# File lib/procrastinator/scheduler.rb, line 188
def shutdown_on_interrupt
   Signal.trap('INT') do
      warn "\n" # just to separate the shutdown log item
      shutdown!
   end
end
spawn_threads() click to toggle source
# File lib/procrastinator/scheduler.rb, line 157
def spawn_threads
   @logger.info "Starting workers for queues: #{ @workers.collect(&:name).join(', ') }"

   @workers.collect do |worker|
      @logger.debug "Spawning thread: #{ worker.name }"
      Thread.new(worker) do |w|
         Thread.current.abort_on_exception = true
         Thread.current.thread_variable_set(:name, w.name)

         begin
            worker.work!
         ensure
            worker.halt
         end
      end
   end
end
thread_crash(error) click to toggle source
# File lib/procrastinator/scheduler.rb, line 175
         def thread_crash(error)
            crashed_threads = (@threads || []).select { |t| t.status.nil? }.collect do |thread|
               "Crashed thread: #{ thread.thread_variable_get(:name) }"
            end

            @logger.fatal <<~MSG
               Crash detected in queue worker thread.
                  #{ crashed_threads.join("\n") }
                  #{ error.message }
                  #{ error.backtrace.join("\n\t") }"
            MSG
         end