class RocketJob::Supervisor

Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.

Attributes

server[R]
worker_id[RW]
worker_pool[R]

Public Class Methods

new(server) click to toggle source
# File lib/rocket_job/supervisor.rb, line 24
def initialize(server)
  @server      = server
  @worker_pool = WorkerPool.new(server.name)
  @mutex       = Mutex.new
end
run() click to toggle source

Start the Supervisor, using the supplied attributes to create a new Server instance.

# File lib/rocket_job/supervisor.rb, line 13
def self.run
  Thread.current.name = "rocketjob main"
  RocketJob.create_indexes
  register_signal_handlers

  server = Server.create!
  new(server).run
ensure
  server&.destroy
end

Public Instance Methods

run() click to toggle source
# File lib/rocket_job/supervisor.rb, line 30
def run
  logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
  logger.info("Running with filter", Config.filter) if Config.filter
  server.started!
  logger.info "Rocket Job Server started"

  event_listener = Thread.new { Event.listener }
  Subscribers::SecretConfig.subscribe if defined?(SecretConfig)
  Subscribers::Server.subscribe(self) do
    Subscribers::Worker.subscribe(self) do
      Subscribers::Logger.subscribe do
        supervise_pool
        stop!
      end
    end
  end
rescue ::Mongoid::Errors::DocumentNotFound
  logger.info("Server has been destroyed. Going down hard!")
rescue Exception => e
  logger.error("RocketJob::Server is stopping due to an exception", e)
ensure
  event_listener&.kill
  # Logs the backtrace for each running worker
  worker_pool.log_backtraces
  logger.info("Shutdown Complete")
end
stop!() click to toggle source
# File lib/rocket_job/supervisor.rb, line 57
def stop!
  server.stop! if server.may_stop?
  synchronize do
    worker_pool.stop
  end
  until worker_pool.join
    logger.info "Waiting for workers to finish processing ..."
    # One or more workers still running so update heartbeat so that server reports "alive".
    server.refresh(worker_pool.living_count)
  end
end
supervise_pool() click to toggle source
# File lib/rocket_job/supervisor.rb, line 69
def supervise_pool
  stagger = true
  until self.class.shutdown?
    synchronize do
      if server.running?
        worker_pool.prune
        worker_pool.rebalance(server.max_workers, stagger)
        stagger = false
      elsif server.paused?
        worker_pool.stop
        sleep(0.1)
        worker_pool.prune
        stagger = true
      else
        break
      end
    end

    synchronize { server.refresh(worker_pool.living_count) }

    self.class.wait_for_event(Config.heartbeat_seconds)

    break if self.class.shutdown?
  end
end
synchronize(&block) click to toggle source
# File lib/rocket_job/supervisor.rb, line 95
def synchronize(&block)
  @mutex.synchronize(&block)
end