module ResqueAdmin::Scheduler

Constants

CLI_OPTIONS_ENV_MAPPING
VERSION

Attributes

failure_handler[W]

allow user to set an additional failure handler

logger[W]
scheduled_jobs[R]

the Rufus::Scheduler jobs that are scheduled

Public Class Methods

before_shutdown() click to toggle source
# File lib/resque_admin/scheduler.rb, line 375
def before_shutdown
  stop_rufus_scheduler
  release_master_lock
end
clear_schedule!() click to toggle source

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler

# File lib/resque_admin/scheduler.rb, line 288
def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @scheduled_jobs = {}
  rufus_scheduler
end
enqueue(config) click to toggle source
# File lib/resque_admin/scheduler.rb, line 213
def enqueue(config)
  enqueue_from_config(config)
rescue => e
  ResqueAdmin::Scheduler.failure_handler.on_enqueue_failure(config, e)
end
enqueue_delayed_items_for_timestamp(timestamp) click to toggle source

Enqueues all delayed jobs for a timestamp

# File lib/resque_admin/scheduler.rb, line 200
def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  loop do
    handle_shutdown do
      # Continually check that it is still the master
      item = enqueue_next_item(timestamp) if master?
    end
    # continue processing until there are no more ready items in this
    # timestamp
    break if item.nil?
  end
end
enqueue_from_config(job_config) click to toggle source

Enqueues a job based on a config hash

# File lib/resque_admin/scheduler.rb, line 226
def enqueue_from_config(job_config)
  args = job_config['args'] || job_config[:args]

  klass_name = job_config['class'] || job_config[:class]
  begin
    klass = ResqueAdmin::Scheduler::Util.constantize(klass_name)
  rescue NameError
    klass = klass_name
  end

  params = args.is_a?(Hash) ? [args] : Array(args)
  queue = job_config['queue'] ||
          job_config[:queue] ||
          ResqueAdmin.queue_from_class(klass)
  # Support custom job classes like those that inherit from
  # ResqueAdmin::JobWithStatus (resque_admin-status)
  job_klass = job_config['custom_job_class']
  if job_klass && job_klass != 'ResqueAdmin::Job'
    # The custom job class API must offer a static "scheduled" method. If
    # the custom job class can not be constantized (via a requeue call
    # from the web perhaps), fall back to enqueing normally via
    # ResqueAdmin::Job.create.
    begin
      ResqueAdmin::Scheduler::Util.constantize(job_klass).scheduled(
        queue, klass_name, *params
      )
    rescue NameError
      # Note that the custom job class (job_config['custom_job_class'])
      # is the one enqueued
      ResqueAdmin::Job.create(queue, job_klass, *params)
    end
  else
    # Hack to avoid havoc for people shoving stuff into queues
    # for non-existent classes (for example: running scheduler in
    # one app that schedules for another.
    if Class === klass
      ResqueAdmin::Scheduler::Plugin.run_before_delayed_enqueue_hooks(
        klass, *params
      )

      # If the class is a custom job class, call self#scheduled on it.
      # This allows you to do things like ResqueAdmin.enqueue_at(timestamp,
      # CustomJobClass). Otherwise, pass off to ResqueAdmin.
      if klass.respond_to?(:scheduled)
        klass.scheduled(queue, klass_name, *params)
      else
        ResqueAdmin.enqueue_to(queue, klass, *params)
      end
    else
      # This will not run the before_hooks in rescue, but will at least
      # queue the job.
      ResqueAdmin::Job.create(queue, klass, *params)
    end
  end
end
enqueue_next_item(timestamp) click to toggle source
# File lib/resque_admin/scheduler.rb, line 188
def enqueue_next_item(timestamp)
  item = ResqueAdmin.next_item_for_timestamp(timestamp)

  if item
    log "queuing #{item['class']} [delayed]"
    enqueue(item)
  end

  item
end
env_matches?(configured_env) click to toggle source

Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.

# File lib/resque_admin/scheduler.rb, line 171
def env_matches?(configured_env)
  env && configured_env.split(/[\s,]+/).include?(env)
end
failure_handler() click to toggle source
# File lib/resque_admin/scheduler.rb, line 407
def failure_handler
  @failure_handler ||= ResqueAdmin::Scheduler::FailureHandler
end
handle_delayed_items(at_time = nil) click to toggle source

Handles queueing delayed items at_time - Time to start scheduling items (default: now).

# File lib/resque_admin/scheduler.rb, line 177
def handle_delayed_items(at_time = nil)
  timestamp = ResqueAdmin.next_delayed_timestamp(at_time)
  if timestamp
    procline 'Processing Delayed Items'
    until timestamp.nil?
      enqueue_delayed_items_for_timestamp(timestamp)
      timestamp = ResqueAdmin.next_delayed_timestamp(at_time)
    end
  end
end
handle_shutdown() { || ... } click to toggle source
# File lib/resque_admin/scheduler.rb, line 219
def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end
handle_signals_with_operation() { || ... } click to toggle source
# File lib/resque_admin/scheduler.rb, line 361
def handle_signals_with_operation
  yield if block_given?
  handle_signals
  false
rescue Interrupt
  before_shutdown if @shutdown
  true
end
load_schedule!() click to toggle source

Pulls the schedule from ResqueAdmin.schedule and loads it into the rufus scheduler instance

# File lib/resque_admin/scheduler.rb, line 89
def load_schedule!
  procline 'Loading Schedule'

  # Need to load the schedule from redis for the first time if dynamic
  ResqueAdmin.reload_schedule! if dynamic

  log! 'Schedule empty! Set ResqueAdmin.schedule' if ResqueAdmin.schedule.empty?

  @scheduled_jobs = {}

  ResqueAdmin.schedule.each do |name, config|
    load_schedule_job(name, config)
  end
  ResqueAdmin.redis.del(:schedules_changed)
  procline 'Schedules Loaded'
end
load_schedule_job(name, config) click to toggle source

Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs

# File lib/resque_admin/scheduler.rb, line 126
def load_schedule_job(name, config)
  # If `rails_env` or `env` is set in the config, load jobs only if they
  # are meant to be loaded in `ResqueAdmin::Scheduler.env`.  If `rails_env` or
  # `env` is missing, the job should be scheduled regardless of the value
  # of `ResqueAdmin::Scheduler.env`.

  configured_env = config['rails_env'] || config['env']

  if configured_env.nil? || env_matches?(configured_env)
    log! "Scheduling #{name} "
    interval_defined = false
    interval_types = %w(cron every)
    interval_types.each do |interval_type|
      next unless !config[interval_type].nil? && !config[interval_type].empty?
      args = optionizate_interval_value(config[interval_type])
      args = [args, nil, job: true] if args.is_a?(::String)

      job = rufus_scheduler.send(interval_type, *args) do
        enqueue_recurring(name, config)
      end
      @scheduled_jobs[name] = job
      interval_defined = true
      break
    end
    unless interval_defined
      log! "no #{interval_types.join(' / ')} found for " \
           "#{config['class']} (#{name}) - skipping"
    end
  else
    log "Skipping schedule of #{name} because configured " \
        "env #{configured_env.inspect} does not match current " \
        "env #{env.inspect}"
  end
end
log(msg) click to toggle source
# File lib/resque_admin/scheduler.rb, line 396
def log(msg)
  logger.debug { msg }
end
log!(msg) click to toggle source
# File lib/resque_admin/scheduler.rb, line 388
def log!(msg)
  logger.info { msg }
end
log_error(msg) click to toggle source
# File lib/resque_admin/scheduler.rb, line 392
def log_error(msg)
  logger.error { msg }
end
logger() click to toggle source
# File lib/resque_admin/scheduler.rb, line 411
def logger
  @logger ||= ResqueAdmin::Scheduler::LoggerBuilder.new(
    quiet: quiet,
    verbose: verbose,
    log_dev: logfile,
    format: logformat
  ).build
end
optionizate_interval_value(value) click to toggle source

modify interval type value to value with options if options available

# File lib/resque_admin/scheduler.rb, line 107
def optionizate_interval_value(value)
  args = value
  if args.is_a?(::Array)
    return args.first if args.size > 2 || !args.last.is_a?(::Hash)
    # symbolize keys of hash for options
    args[2] = args[1].reduce({}) do |m, i|
      key, value = i
      m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value
      m
    end

    args[2][:job] = true
    args[1] = nil
  end
  args
end
poll_sleep() click to toggle source

Sleeps and returns true

# File lib/resque_admin/scheduler.rb, line 328
def poll_sleep
  handle_shutdown do
    begin
      poll_sleep_loop
    ensure
      @sleeping = false
    end
  end
  true
end
poll_sleep_loop() click to toggle source
# File lib/resque_admin/scheduler.rb, line 339
def poll_sleep_loop
  @sleeping = true
  if poll_sleep_amount > 0
    start = Time.now
    loop do
      elapsed_sleep = (Time.now - start)
      remaining_sleep = poll_sleep_amount - elapsed_sleep
      @do_break = false
      if remaining_sleep <= 0
        @do_break = true
      else
        @do_break = handle_signals_with_operation do
          sleep(remaining_sleep)
        end
      end
      break if @do_break
    end
  else
    handle_signals_with_operation
  end
end
print_schedule() click to toggle source
procline(string) click to toggle source
# File lib/resque_admin/scheduler.rb, line 400
def procline(string)
  log! string
  argv0 = build_procline(string)
  log "Setting procline #{argv0.inspect}"
  $0 = argv0
end
rails_env_matches?(config) click to toggle source

Returns true if the given schedule config hash matches the current env

# File lib/resque_admin/scheduler.rb, line 162
def rails_env_matches?(config)
  warn '`ResqueAdmin::Scheduler.rails_env_matches?` is deprecated. ' \
       'Please use `ResqueAdmin::Scheduler.env_matches?` instead.'
  config['rails_env'] && env &&
    config['rails_env'].split(/[\s,]+/).include?(env)
end
reload_schedule!() click to toggle source
# File lib/resque_admin/scheduler.rb, line 295
def reload_schedule!
  procline 'Reloading Schedule'
  clear_schedule!
  load_schedule!
end
rufus_scheduler() click to toggle source
# File lib/resque_admin/scheduler.rb, line 282
def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.new
end
run() click to toggle source

Schedule all jobs and continually look for delayed jobs (never returns)

# File lib/resque_admin/scheduler.rb, line 35
def run
  procline 'Starting'

  # trap signals
  register_signal_handlers

  # Quote from the resque_admin/worker.
  # Fix buffering so we can `rake resque_admin:scheduler > scheduler.log` and
  # get output from the child in there.
  $stdout.sync = true
  $stderr.sync = true

  # Load the schedule into rufus
  # If dynamic is set, load that schedule otherwise use normal load
  if dynamic
    reload_schedule!
  else
    load_schedule!
  end

  begin
    @th = Thread.current

    # Now start the scheduling part of the loop.
    loop do
      begin
        if master?
          handle_delayed_items
          update_schedule if dynamic
        end
      rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e
        log! e.message
        release_master_lock
      end
      poll_sleep
    end

  rescue Interrupt
    log 'Exiting'
  end
end
shutdown() click to toggle source

Sets the shutdown flag, clean schedules and exits if sleeping

# File lib/resque_admin/scheduler.rb, line 381
def shutdown
  return if @shutdown
  @shutdown = true
  log!('Shutting down')
  @th.raise Interrupt if @sleeping
end
stop_rufus_scheduler() click to toggle source
# File lib/resque_admin/scheduler.rb, line 370
def stop_rufus_scheduler
  rufus_scheduler.shutdown(:wait)
  rufus_scheduler.join
end
unschedule_job(name) click to toggle source
# File lib/resque_admin/scheduler.rb, line 319
def unschedule_job(name)
  if scheduled_jobs[name]
    log "Removing schedule #{name}"
    scheduled_jobs[name].unschedule
    @scheduled_jobs.delete(name)
  end
end
update_schedule() click to toggle source
# File lib/resque_admin/scheduler.rb, line 301
def update_schedule
  if ResqueAdmin.redis.scard(:schedules_changed) > 0
    procline 'Updating schedule'
    loop do
      schedule_name = ResqueAdmin.redis.spop(:schedules_changed)
      break unless schedule_name
      ResqueAdmin.reload_schedule!
      if ResqueAdmin.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, ResqueAdmin.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    procline 'Schedules Loaded'
  end
end

Private Class Methods

app_str() click to toggle source
# File lib/resque_admin/scheduler.rb, line 430
def app_str
  app_name ? "[#{app_name}]" : ''
end
build_procline(string) click to toggle source
# File lib/resque_admin/scheduler.rb, line 438
def build_procline(string)
  "#{internal_name}#{app_str}#{env_str}: #{string}"
end
enqueue_recurring(name, config) click to toggle source
# File lib/resque_admin/scheduler.rb, line 422
def enqueue_recurring(name, config)
  if master?
    log! "queueing #{config['class']} (#{name})"
    ResqueAdmin.last_enqueued_at(name, Time.now.to_s)
    enqueue(config)
  end
end
env_str() click to toggle source
# File lib/resque_admin/scheduler.rb, line 434
def env_str
  env ? "[#{env}]" : ''
end
internal_name() click to toggle source
# File lib/resque_admin/scheduler.rb, line 442
def internal_name
  "resque_admin-scheduler-#{ResqueAdmin::Scheduler::VERSION}"
end