class Scheduler::MainProcess
Attributes
job_class[RW]
@return [Class] the class of the main job model.
logger[RW]
@return [String] a logger file.
max_concurrent_jobs[RW]
@return [Integer] maximum number of concurent jobs.
pid[RW]
@return [Integer] pid of the main process.
polling_interval[RW]
@return [Integer] how much time to wait before each iteration.
Public Class Methods
new()
click to toggle source
Creates a MainProcess
which keeps running and continuously checks if new jobs are queued.
@return [Scheduler::MainProcess] the created MainProcess
.
# File lib/scheduler/main_process.rb, line 20 def initialize @pid = Process.pid @logger = Scheduler.configuration.logger @job_class = Scheduler.configuration.job_class @polling_interval = Scheduler.configuration.polling_interval @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs unless @logger.instance_of?(ActiveSupport::Logger) or @logger.instance_of?(Logger) @logger = Logger.new(@logger) end if @polling_interval < 1 @logger.warn "[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\ "it will be forced to 1.".yellow @polling_interval = 1 end unless @job_class.included_modules.include? Scheduler::Schedulable raise "The given job class '#{@job_class}' is not a Schedulable class. "\ "Make sure to add 'include Scheduler::Schedulable' to your class." end @logger.info "[Scheduler:#{@pid}] Starting scheduler..".cyan self.start_loop end
Public Instance Methods
reschedule_running_jobs()
click to toggle source
Reschedules currently running jobs.
@return [nil]
# File lib/scheduler/main_process.rb, line 124 def reschedule_running_jobs @job_class.running.each do |job| begin Process.kill :QUIT, job.pid if job.pid.present? rescue Errno::ESRCH, Errno::EPERM ensure job.schedule end end end
start_loop()
click to toggle source
Main loop.
@return [nil]
# File lib/scheduler/main_process.rb, line 50 def start_loop loop do begin # Loads up a job queue. queue = [] # Counts jobs to schedule. running_jobs = @job_class.running.entries schedulable_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries jobs_to_schedule = @max_concurrent_jobs - running_jobs.count jobs_to_schedule = 0 if jobs_to_schedule < 0 # Finds out scheduled jobs waiting to be performed. scheduled_jobs = [] schedulable_jobs.first(jobs_to_schedule).each do |job| job_pid = Process.fork do begin job.perform_now rescue StandardError => e @logger.error "[Scheduler:#{@pid}] Error #{e.class}: #{e.message} "\ "(#{e.backtrace.select { |l| l.include?('app') }.first}).".red end end Process.detach(job_pid) job.update_attribute(:pid, job_pid) scheduled_jobs << job queue << job.id.to_s end # Logs launched jobs if scheduled_jobs.any? @logger.info "[Scheduler:#{@pid}] Launched #{scheduled_jobs.count} "\ "jobs: #{scheduled_jobs.map(&:id).map(&:to_s).join(', ')}.".cyan else if schedulable_jobs.count == 0 @logger.info "[Scheduler:#{@pid}] No jobs in queue.".cyan else @logger.warn "[Scheduler:#{@pid}] No jobs launched, reached maximum "\ "number of concurrent jobs. Jobs in queue: #{schedulable_jobs.count}.".yellow end end # Checks for completed jobs: clears up queue and kills any zombie pid queue.delete_if do |job_id| job = @job_class.find(job_id) if job.present? and job.status.in? [ :completed, :error ] begin @logger.info "[Scheduler:#{@pid}] Rimosso processo #{job.pid} per lavoro completato".cyan Process.kill :QUIT, job.pid rescue Errno::ENOENT, Errno::ESRCH end true else false end end # Waits the specified amount of time before next iteration sleep @polling_interval rescue StandardError => error @logger.error "[Scheduler:#{@pid}] Error #{error.message}".red @logger.error error.backtrace.select { |line| line.include?('app') }.join("\n").red rescue SignalException => signal if signal.message.in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ] @logger.warn "[Scheduler:#{@pid}] Received interrupt, terminating scheduler..".yellow reschedule_running_jobs break end end end end