class Sqeduler::LockMaintainer

This is to ensure that if you set your jobs to run one a time and something goes wrong causing a job to run for a long time, your lock won't expire. This doesn't stop long running jobs, it just ensures you only end up with one long running job rather than 20 of them.

Constants

RUN_INTERVAL
RUN_JITTER

Public Class Methods

new() click to toggle source
# File lib/sqeduler/lock_maintainer.rb, line 11
def initialize
  @class_with_locks = {}
end

Public Instance Methods

run() click to toggle source

This is only done when we initialize Sqeduler, don't need to worry about threading

# File lib/sqeduler/lock_maintainer.rb, line 16
def run
  @maintainer_thread ||= Thread.new do
    loop do
      begin
        if redis_lock.lock
          begin
            synchronize
          ensure
            redis_lock.unlock
          end
        end

      rescue => ex
        Service.logger.error "[#{self.class}] #{ex.class}, #{ex.message}"
      end

      sleep RUN_INTERVAL + rand(RUN_JITTER)
    end
  end
end

Private Instance Methods

redis_lock() click to toggle source
# File lib/sqeduler/lock_maintainer.rb, line 82
def redis_lock
  @redis_lock ||= RedisLock.new("sqeduler-lock-maintainer", :expiration => 60, :timeout => 0)
end
str_to_class(class_name) click to toggle source

Not all classes will use exclusive locks

# File lib/sqeduler/lock_maintainer.rb, line 66
def str_to_class(class_name)
  return @class_with_locks[class_name] unless @class_with_locks[class_name].nil?

  klass = class_name.constantize
  if klass.respond_to?(:synchronize_jobs_mode)
    # We only care about exclusive jobs that are long running
    if klass.synchronize_jobs_mode == :one_at_a_time && klass.synchronize_jobs_expiration >= RUN_INTERVAL
      Service.logger.debug "[#{self.class}] Adding #{class_name} to the whitelist of classes that have locks"
      return @class_with_locks[class_name] = klass
    end
  end

  Service.logger.debug "[#{self.class}] Adding #{class_name} to the blacklist of classes that have locks"
  @class_with_locks[class_name] = false
end
synchronize() click to toggle source
# File lib/sqeduler/lock_maintainer.rb, line 39
def synchronize
  now = Time.now.to_i

  Service.redis_pool.with do |redis|
    redis.pipelined do
      workers.each do |_worker, _tid, args|
        # No sense in pinging if it's not been running long enough to matter
        next if (now - args["run_at"]) < RUN_INTERVAL

        klass = str_to_class(args["payload"]["class"])
        next unless klass

        lock_key = klass.sync_lock_key(*args["payload"]["args"])

        # This works because EXPIRE does not recreate the key, it only resets the expiration.
        # We don't have to worry about atomic operations or anything like that.
        # If the job finishes in the interim and deletes the key nothing will happen.
        redis.expire(lock_key, klass.synchronize_jobs_expiration)

        Service.logger.debug "[#{self.class}] Refreshing lock on '#{lock_key}" \
          "to #{klass.synchronize_jobs_expiration} seconds"
      end
    end
  end
end
workers() click to toggle source
# File lib/sqeduler/lock_maintainer.rb, line 86
def workers
  @workers ||= Sidekiq::Workers.new
end