module SidekiqRepeatingJobs::Middleware::Helpers

Public Class Methods

included(base) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 8
def self.included(base)
  base.extend(ClassMethods)
end

Public Instance Methods

log_message(args, message, level=:info) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 12
def log_message(args, message, level=:info)
  Sidekiq.logger.send(level, "[#{self.log_name}] [#{/^([:_a-z]+\:repeating_job\:)([:_a-z]+)$/.match(args['repeating_job_key'])[2]}] #{message}")
end

Private Instance Methods

active_key(args) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 22
def active_key(args)
  "#{args['repeating_job_key']}:active"
end
check_reservation(worker_class, arguments) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 70
def check_reservation(worker_class, arguments)
  Sidekiq.redis do |conn|
    if conn.redis.get(active_key(arguments)) == 'true' || conn.redis.setnx(active_key(arguments), 'true')
      repeating_job_id=arguments['repeating_job_id'] || SecureRandom.hex(16).downcase
      existing_repeating_job_id=conn.redis.get(owner_id_key(arguments))
      unless repeating_job_id == existing_repeating_job_id || conn.redis.setnx(owner_id_key(arguments), repeating_job_id)
        if existing_repeating_job_id
          self.log_message(arguments, "existing reservation [#{existing_repeating_job_id}] for #{conn.redis.ttl(owner_id_key(arguments))} seconds")
        else
          self.log_message(arguments, "invalid reservation [#{arguments['repeating_job_id']}]")
        end
        arguments.delete('repeating_job_id')
      else
        arguments['repeating_job_id']=repeating_job_id
        result=conn.redis.multi do
          conn.redis.expire(owner_id_key(arguments), SidekiqRepeatingJobs::Config.expiration_interval(worker_class))
          conn.redis.setex(owner_name_key(arguments), SidekiqRepeatingJobs::Config.expiration_interval(worker_class), process_name)
          conn.redis.set(last_run_at_key(arguments), DateTime.now.strftime("%Y-%m-%d %H:%M:%S %Z"))
        end
        if existing_repeating_job_id
          self.log_message(arguments, "extended reservation [#{arguments['repeating_job_id']}] by #{SidekiqRepeatingJobs::Config.expiration_interval(worker_class)} seconds")
        else
          self.log_message(arguments, "reserved [#{arguments['repeating_job_id']}] for #{SidekiqRepeatingJobs::Config.expiration_interval(worker_class)} seconds")
        end
      end
    else
      self.log_message(arguments, "is currently inactive")
    end
  end
end
check_scheduler(worker_class, arguments) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 50
def check_scheduler(worker_class, arguments)
  Sidekiq.redis do |conn|
    if (scheduler_id=conn.redis.get("#{arguments['scheduler_key']}:owner_id"))
      if arguments['scheduler_id']
        unless arguments['scheduler_id'] == scheduler_id
          self.log_message(arguments, "schedule [#{arguments['scheduler_id']}] no longer active")
          arguments.delete('scheduler_id')
          conn.redis.del(owner_id_key(arguments))
        end
      else
        arguments['scheduler_id']=scheduler_id
        self.log_message(arguments, "Using active schedule [#{arguments['scheduler_id']}]")
      end
    else
      self.log_message(arguments, "has no active schedule")
      arguments.delete('scheduler_id')
    end
  end
end
last_run_at_key(args) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 34
def last_run_at_key(args)
  "#{args['repeating_job_key']}:last_run_at"
end
owner_id_key(args) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 26
def owner_id_key(args)
  "#{args['repeating_job_key']}:owner_id"
end
owner_name_key(args) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 30
def owner_name_key(args)
  "#{args['repeating_job_key']}:owner_name"
end
process_name() click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 18
def process_name
  "#{Socket.gethostbyname(Socket.gethostname).first}-p#{Process.pid}-t#{Thread.current.object_id}"
end
valid_arguments?(worker_class, arguments) click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 38
def valid_arguments?(worker_class, arguments)
  raise "Missing repeat job arguments" if arguments.nil? || !arguments.is_a?(Hash)
  arguments.assert_valid_keys('scheduler_key', 'scheduler_id', 'repeating_job_key', 'repeating_job_id')
  raise "Missing scheduler_key repeat job argument" if arguments['scheduler_key'].nil?
  check_scheduler(worker_class, arguments)
  return false if arguments['scheduler_id'].nil?
  raise "Missing repeating_job_key repeat job argument" if arguments['repeating_job_key'].nil?
  check_reservation(worker_class, arguments)
  return false if arguments['repeating_job_id'].nil?
  return true
end