class MiniScheduler::Manager
Attributes
enable_stats[RW]
queue[RW]
random_ratio[RW]
redis[RW]
workers[RW]
Public Class Methods
current()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 202 def self.current @current ||= {} end
discover_queues()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 327 def self.discover_queues ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set end
discover_schedules()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 331 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end
lock_key(queue)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 359 def self.lock_key(queue) "_scheduler_lock_#{queue}_" end
new(options = nil)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 182 def initialize(options = nil) @queue = options && options[:queue] || "default" @workers = options && options[:workers] || 1 @redis = MiniScheduler.redis @random_ratio = 0.1 unless options && options[:skip_runner] @runner = Runner.new(self) self.class.current[@queue] = self end @hostname = options && options[:hostname] @manager_id = SecureRandom.hex if options && options.key?(:enable_stats) @enable_stats = options[:enable_stats] else @enable_stats = !!defined?(MiniScheduler::Stat) end end
queue_key(queue, hostname = nil)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 363 def self.queue_key(queue, hostname = nil) if hostname "_scheduler_queue_#{queue}_#{hostname}_" else "_scheduler_queue_#{queue}_" end end
schedule_key(klass, hostname = nil)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 371 def self.schedule_key(klass, hostname = nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end
seq()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 348 def self.seq @mutex.synchronize do @i ||= 0 @i += 1 end end
without_runner()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 178 def self.without_runner self.new(skip_runner: true) end
Public Instance Methods
blocking_tick()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 303 def blocking_tick tick @runner.wait_till_done end
ensure_schedule!(klass)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 222 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end
get_klass(name)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 256 def get_klass(name) name.constantize rescue NameError nil end
hostname()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 206 def hostname @hostname ||= begin `hostname`.strip rescue "unknown" end end
identity_key()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 355 def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end
keep_alive()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 317 def keep_alive redis.setex identity_key, keep_alive_duration, "" end
keep_alive_duration()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 313 def keep_alive_duration 60 end
lock() { || ... }
click to toggle source
# File lib/mini_scheduler/manager.rb, line 321 def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end
next_run(klass)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 218 def next_run(klass) schedule_info(klass).next_run end
remove(klass)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 228 def remove(klass) lock do schedule_info(klass).del! end end
repair_queue()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 262 def repair_queue return if redis.exists?(self.class.queue_key(queue)) || redis.exists?(self.class.queue_key(queue, hostname)) self.class.discover_schedules .select { |schedule| schedule.queue == queue } .each { |schedule| ensure_schedule!(schedule) } end
reschedule_orphans!()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 234 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end
reschedule_orphans_on!(hostname = nil)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 241 def reschedule_orphans_on!(hostname = nil) redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end
schedule_info(klass)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 214 def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end
schedule_next_job(hostname = nil)
click to toggle source
# File lib/mini_scheduler/manager.rb, line 278 def schedule_next_job(hostname = nil) (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) if !klass || ( (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host) ) # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(queue, hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end
stop!()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 308 def stop! @runner.stop! self.class.current.delete(@queue) end
tick()
click to toggle source
# File lib/mini_scheduler/manager.rb, line 271 def tick lock do schedule_next_job schedule_next_job(hostname) end end