module SidekiqRepeatingJobs::Middleware::Helpers
Public Class Methods
included(base)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 8 def self.included(base) base.extend(ClassMethods) end
Public Instance Methods
log_message(args, message, level=:info)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 12 def log_message(args, message, level=:info) Sidekiq.logger.send(level, "[#{self.log_name}] [#{/^([:_a-z]+\:repeating_job\:)([:_a-z]+)$/.match(args['repeating_job_key'])[2]}] #{message}") end
Private Instance Methods
active_key(args)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 22 def active_key(args) "#{args['repeating_job_key']}:active" end
check_reservation(worker_class, arguments)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 70 def check_reservation(worker_class, arguments) Sidekiq.redis do |conn| if conn.redis.get(active_key(arguments)) == 'true' || conn.redis.setnx(active_key(arguments), 'true') repeating_job_id=arguments['repeating_job_id'] || SecureRandom.hex(16).downcase existing_repeating_job_id=conn.redis.get(owner_id_key(arguments)) unless repeating_job_id == existing_repeating_job_id || conn.redis.setnx(owner_id_key(arguments), repeating_job_id) if existing_repeating_job_id self.log_message(arguments, "existing reservation [#{existing_repeating_job_id}] for #{conn.redis.ttl(owner_id_key(arguments))} seconds") else self.log_message(arguments, "invalid reservation [#{arguments['repeating_job_id']}]") end arguments.delete('repeating_job_id') else arguments['repeating_job_id']=repeating_job_id result=conn.redis.multi do conn.redis.expire(owner_id_key(arguments), SidekiqRepeatingJobs::Config.expiration_interval(worker_class)) conn.redis.setex(owner_name_key(arguments), SidekiqRepeatingJobs::Config.expiration_interval(worker_class), process_name) conn.redis.set(last_run_at_key(arguments), DateTime.now.strftime("%Y-%m-%d %H:%M:%S %Z")) end if existing_repeating_job_id self.log_message(arguments, "extended reservation [#{arguments['repeating_job_id']}] by #{SidekiqRepeatingJobs::Config.expiration_interval(worker_class)} seconds") else self.log_message(arguments, "reserved [#{arguments['repeating_job_id']}] for #{SidekiqRepeatingJobs::Config.expiration_interval(worker_class)} seconds") end end else self.log_message(arguments, "is currently inactive") end end end
check_scheduler(worker_class, arguments)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 50 def check_scheduler(worker_class, arguments) Sidekiq.redis do |conn| if (scheduler_id=conn.redis.get("#{arguments['scheduler_key']}:owner_id")) if arguments['scheduler_id'] unless arguments['scheduler_id'] == scheduler_id self.log_message(arguments, "schedule [#{arguments['scheduler_id']}] no longer active") arguments.delete('scheduler_id') conn.redis.del(owner_id_key(arguments)) end else arguments['scheduler_id']=scheduler_id self.log_message(arguments, "Using active schedule [#{arguments['scheduler_id']}]") end else self.log_message(arguments, "has no active schedule") arguments.delete('scheduler_id') end end end
last_run_at_key(args)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 34 def last_run_at_key(args) "#{args['repeating_job_key']}:last_run_at" end
owner_id_key(args)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 26 def owner_id_key(args) "#{args['repeating_job_key']}:owner_id" end
owner_name_key(args)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 30 def owner_name_key(args) "#{args['repeating_job_key']}:owner_name" end
process_name()
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 18 def process_name "#{Socket.gethostbyname(Socket.gethostname).first}-p#{Process.pid}-t#{Thread.current.object_id}" end
valid_arguments?(worker_class, arguments)
click to toggle source
# File lib/sidekiq_repeating_jobs/middleware/helpers.rb, line 38 def valid_arguments?(worker_class, arguments) raise "Missing repeat job arguments" if arguments.nil? || !arguments.is_a?(Hash) arguments.assert_valid_keys('scheduler_key', 'scheduler_id', 'repeating_job_key', 'repeating_job_id') raise "Missing scheduler_key repeat job argument" if arguments['scheduler_key'].nil? check_scheduler(worker_class, arguments) return false if arguments['scheduler_id'].nil? raise "Missing repeating_job_key repeat job argument" if arguments['repeating_job_key'].nil? check_reservation(worker_class, arguments) return false if arguments['repeating_job_id'].nil? return true end