class Scheduler::MainProcess

Attributes

job_class[RW]

@return [Class] the class of the main job model.

logger[RW]

@return [String] a logger file.

max_concurrent_jobs[RW]

@return [Integer] maximum number of concurent jobs.

pid[RW]

@return [Integer] pid of the main process.

polling_interval[RW]

@return [Integer] how much time to wait before each iteration.

Public Class Methods

new() click to toggle source

Creates a MainProcess which keeps running and continuously checks if new jobs are queued.

@return [Scheduler::MainProcess] the created MainProcess.

# File lib/scheduler/main_process.rb, line 20
def initialize
  @pid = Process.pid
  @logger = Scheduler.configuration.logger
  @job_class = Scheduler.configuration.job_class
  @polling_interval = Scheduler.configuration.polling_interval
  @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs

  unless @logger.instance_of?(ActiveSupport::Logger) or @logger.instance_of?(Logger)
    @logger = Logger.new(@logger)
  end

  if @polling_interval < 1
    @logger.warn "[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\
      "it will be forced to 1.".yellow
    @polling_interval = 1
  end
  
  unless @job_class.included_modules.include? Scheduler::Schedulable
    raise "The given job class '#{@job_class}' is not a Schedulable class. "\
      "Make sure to add 'include Scheduler::Schedulable' to your class."
  end
  
  @logger.info "[Scheduler:#{@pid}] Starting scheduler..".cyan
  self.start_loop
end

Public Instance Methods

reschedule_running_jobs() click to toggle source

Reschedules currently running jobs.

@return [nil]

# File lib/scheduler/main_process.rb, line 124
def reschedule_running_jobs
  @job_class.running.each do |job|
    begin
      Process.kill :QUIT, job.pid if job.pid.present?
    rescue Errno::ESRCH, Errno::EPERM
    ensure
      job.schedule
    end
  end
end
start_loop() click to toggle source

Main loop.

@return [nil]

# File lib/scheduler/main_process.rb, line 50
def start_loop
  loop do
    begin
      # Loads up a job queue.
      queue = []
      
      # Counts jobs to schedule.
      running_jobs = @job_class.running.entries
      schedulable_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries
      jobs_to_schedule = @max_concurrent_jobs - running_jobs.count
      jobs_to_schedule = 0 if jobs_to_schedule < 0
  
      # Finds out scheduled jobs waiting to be performed.
      scheduled_jobs = []
      schedulable_jobs.first(jobs_to_schedule).each do |job|
        job_pid = Process.fork do
          begin
            job.perform_now
          rescue StandardError => e
            @logger.error "[Scheduler:#{@pid}] Error #{e.class}: #{e.message} "\
              "(#{e.backtrace.select { |l| l.include?('app') }.first}).".red
          end
        end
        Process.detach(job_pid)
        job.update_attribute(:pid, job_pid)
        scheduled_jobs << job
        queue << job.id.to_s
      end
  
      # Logs launched jobs
      if scheduled_jobs.any?
        @logger.info "[Scheduler:#{@pid}] Launched #{scheduled_jobs.count} "\
          "jobs: #{scheduled_jobs.map(&:id).map(&:to_s).join(', ')}.".cyan
      else
        if schedulable_jobs.count == 0
          @logger.info "[Scheduler:#{@pid}] No jobs in queue.".cyan
        else
          @logger.warn "[Scheduler:#{@pid}] No jobs launched, reached maximum "\
            "number of concurrent jobs. Jobs in queue: #{schedulable_jobs.count}.".yellow
        end
      end
  
      # Checks for completed jobs: clears up queue and kills any zombie pid
      queue.delete_if do |job_id|
        job = @job_class.find(job_id)
        if job.present? and job.status.in? [ :completed, :error ]
          begin
            @logger.info "[Scheduler:#{@pid}] Rimosso processo #{job.pid} per lavoro completato".cyan
            Process.kill :QUIT, job.pid
          rescue Errno::ENOENT, Errno::ESRCH
          end
          true
        else false end
      end
  
      # Waits the specified amount of time before next iteration
      sleep @polling_interval
    rescue StandardError => error
      @logger.error "[Scheduler:#{@pid}] Error #{error.message}".red
      @logger.error error.backtrace.select { |line| line.include?('app') }.join("\n").red
    rescue SignalException => signal
      if signal.message.in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ]
        @logger.warn "[Scheduler:#{@pid}] Received interrupt, terminating scheduler..".yellow
        reschedule_running_jobs
        break
      end
    end
  end
end