module Procrastinator::Scheduler::ThreadedWorking
Threaded work style
@see WorkProxy
Constants
- PROG_NAME
Program name. Used as default for pid file names and in logging.
Public Instance Methods
threaded(timeout: nil)
click to toggle source
Work off jobs per queue, each in its own thread.
@param timeout Maximum number of seconds to run for. If nil, will run indefinitely.
# File lib/procrastinator/scheduler.rb, line 135 def threaded(timeout: nil) open_log shutdown_on_interrupt begin @threads = spawn_threads @logger.info "#{ PROG_NAME } running. Process ID: #{ Process.pid }" @threads.each do |thread| thread.join(timeout) end rescue StandardError => e thread_crash(e) ensure @logger&.info 'Halting worker threads...' shutdown! @logger&.info 'Threads halted.' end end
Private Instance Methods
log_path()
click to toggle source
# File lib/procrastinator/scheduler.rb, line 217 def log_path path = @config.log_dir / "#{ PROG_NAME.downcase }.log" path.dirname.mkpath # FileUtils.touch(log_path) path end
open_log(quiet: false)
click to toggle source
# File lib/procrastinator/scheduler.rb, line 199 def open_log(quiet: false) return if @logger log_devs = [] log_devs << StringIO.new if quiet && !@config.log_level log_devs << $stderr unless quiet log_devs << log_path.open('a') if @config.log_level multi = MultiIO.new(*log_devs) multi.sync = true @logger = Logger.new(multi, progname: PROG_NAME.downcase, level: @config.log_level || Logger::INFO, formatter: Config::DEFAULT_LOG_FORMATTER) end
shutdown!()
click to toggle source
# File lib/procrastinator/scheduler.rb, line 195 def shutdown! (@threads || []).select(&:alive?).each(&:kill) end
shutdown_on_interrupt()
click to toggle source
# File lib/procrastinator/scheduler.rb, line 188 def shutdown_on_interrupt Signal.trap('INT') do warn "\n" # just to separate the shutdown log item shutdown! end end
spawn_threads()
click to toggle source
# File lib/procrastinator/scheduler.rb, line 157 def spawn_threads @logger.info "Starting workers for queues: #{ @workers.collect(&:name).join(', ') }" @workers.collect do |worker| @logger.debug "Spawning thread: #{ worker.name }" Thread.new(worker) do |w| Thread.current.abort_on_exception = true Thread.current.thread_variable_set(:name, w.name) begin worker.work! ensure worker.halt end end end end
thread_crash(error)
click to toggle source
# File lib/procrastinator/scheduler.rb, line 175 def thread_crash(error) crashed_threads = (@threads || []).select { |t| t.status.nil? }.collect do |thread| "Crashed thread: #{ thread.thread_variable_get(:name) }" end @logger.fatal <<~MSG Crash detected in queue worker thread. #{ crashed_threads.join("\n") } #{ error.message } #{ error.backtrace.join("\n\t") }" MSG end