class Rufus::Scheduler
Constants
- EoTime
- MAX_WORK_THREADS
MIN_WORK_THREADS = 3
- VERSION
Attributes
attr_accessor :min_work_threads
Public Class Methods
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 187 def h_to_s(t=Time.now) t.strftime('%T.%6N') end
# File lib/rufus/scheduler/util.rb, line 206 def ltstamp; Time.now.strftime('%FT%T.%3N'); end
# File lib/rufus/scheduler/util.rb, line 201 def monow; Process.clock_gettime(Process::CLOCK_MONOTONIC); end
# File lib/rufus/scheduler.rb, line 58 def initialize(opts={}) @opts = opts @started_at = nil @paused_at = nil @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true @mutexes = {} @work_queue = Queue.new @join_queue = Queue.new #@min_work_threads = # opts[:min_work_threads] || opts[:min_worker_threads] || # MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || opts[:max_worker_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. lock || return start end
# File lib/rufus/scheduler/util.rb, line 10 def parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})")) end
# File lib/rufus/scheduler/util.rb, line 42 def parse_at(o, opts={}) return o if o.is_a?(EoTime) return EoTime.make(o) if o.is_a?(Time) EoTime.parse(o, opts) rescue StandardError => se return nil if opts[:no_error] fail se end
# File lib/rufus/scheduler/util.rb, line 20 def parse_cron(o, opts={}) opts[:no_error] ? Fugit.parse_cron(o) : Fugit.do_parse_cron(o) end
Turns a string like '1m10s' into a float like '70.0', more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year 'nada' -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 81 def parse_duration(str, opts={}) d = opts[:no_error] ? Fugit::Duration.parse(str, opts) : Fugit::Duration.do_parse(str, opts) d ? d.to_sec : nil end
# File lib/rufus/scheduler/util.rb, line 27 def parse_in(o, opts={}) #o.is_a?(String) ? parse_duration(o, opts) : o return parse_duration(o, opts) if o.is_a?(String) return o if o.is_a?(Numeric) fail ArgumentError.new("couldn't parse time point in #{o.inspect}") rescue ArgumentError => ae return nil if opts[:no_error] fail ae end
Alias for Rufus::Scheduler.singleton
# File lib/rufus/scheduler.rb, line 110 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler
instance
# File lib/rufus/scheduler.rb, line 103 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let's assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 118 def self.start_new fail 'this is rufus-scheduler 3.x, use .new instead of .start_new' end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without 'marker'
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 124 def to_duration(seconds, options={}) #d = Fugit::Duration.parse(seconds, options).deflate #d = d.drop_seconds if options[:drop_seconds] #d = d.deflate(:month => options[:months]) if options[:months] #d.to_rufus_s to_fugit_duration(seconds, options).to_rufus_s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :s => 0.051 } Rufus.to_duration_hash 7.051 # => { :s => 7.051 } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1.120 }
This method is used by to_duration
behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 152 def to_duration_hash(seconds, options={}) to_fugit_duration(seconds, options).to_rufus_h end
Used by both .to_duration and .to_duration_hash
# File lib/rufus/scheduler/util.rb, line 159 def to_fugit_duration(seconds, options={}) d = Fugit::Duration .parse(seconds, options) .deflate d = d.drop_seconds if options[:drop_seconds] d = d.deflate(:month => options[:months]) if options[:months] d end
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
# File lib/rufus/scheduler/util.rb, line 181 def utc_to_s(t=Time.now) "#{t.dup.utc.strftime('%F %T.%6N')} UTC" end
Public Instance Methods
# File lib/rufus/scheduler.rb, line 128 def around_trigger(job) yield end
# File lib/rufus/scheduler.rb, line 184 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 295 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
Callback called when a job is triggered. If the lock cannot be acquired, the job won't run (though it'll still be scheduled to run again if necessary).
# File lib/rufus/scheduler.rb, line 363 def confirm_lock @trigger_lock.lock end
# File lib/rufus/scheduler.rb, line 224 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 315 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 152 def down? ! @started_at end
# File lib/rufus/scheduler.rb, line 204 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 305 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 194 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 300 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 214 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 310 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 320 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 277 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s) jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 138 def join(time_limit=nil) fail NotRunningError.new('cannot join scheduler that is not running') \ unless @thread fail ThreadError.new('scheduler thread cannot join itself') \ if @thread == Thread.current if time_limit time_limit_join(time_limit) else no_time_limit_join end end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => 'path/to/lock/file' scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won't work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock
and unlock
, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
# File lib/rufus/scheduler.rb, line 346 def lock @scheduler_lock.lock end
# File lib/rufus/scheduler.rb, line 416 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end
# File lib/rufus/scheduler.rb, line 439 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") stderr.puts(" #{pre} #{job.source_location.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") stderr.puts(" #{pre} et-orbi:") stderr.puts(" #{pre} #{EoTime.platform_info}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} frequency: #{frequency.inspect}") stderr.puts(" #{pre} discard_past: #{discard_past.inspect}") stderr.puts(" #{pre} started_at: #{started_at.inspect}") stderr.puts(" #{pre} paused_at: #{paused_at.inspect}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue:") stderr.puts(" #{pre} size: #{@work_queue.size}") stderr.puts(" #{pre} num_waiting: #{@work_queue.num_waiting}") stderr.puts(" #{pre} join_queue:") stderr.puts(" #{pre} size: #{@join_queue.size}") stderr.puts(" #{pre} num_waiting: #{@join_queue.num_waiting}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 167 def pause @paused_at = EoTime.now end
# File lib/rufus/scheduler.rb, line 162 def paused? !! @paused_at end
# File lib/rufus/scheduler.rb, line 248 def repeat(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Rufus::Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 172 def resume(opts={}) dp = opts[:discard_past] jobs.each { |job| job.resume_discard_past = dp } @paused_at = nil end
# File lib/rufus/scheduler.rb, line 411 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 234 def schedule(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Rufus::Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 189 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 229 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 209 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 199 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 219 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 373 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 510 def shutdown(opt=nil) opts = case opt when Symbol then { opt => true } when Hash then opt else {} end @jobs.unschedule_all if opts[:wait] || opts[:join] join_shutdown(opts) elsif opts[:kill] kill_shutdown(opts) else regular_shutdown(opts) end @work_queue.clear unlock @thread.join end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 382 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 434 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
Sister method to lock
, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 353 def unlock @trigger_lock.unlock @scheduler_lock.unlock end
# File lib/rufus/scheduler.rb, line 261 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 157 def up? !! @started_at end
# File lib/rufus/scheduler.rb, line 123 def uptime @started_at ? EoTime.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 133 def uptime_s uptime ? self.class.to_duration(uptime) : '' end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currently running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 400 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
Protected Instance Methods
# File lib/rufus/scheduler.rb, line 688 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at.nil? callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup unless opts.has_key?(:_t) return_job_instance ||= opts[:job] job_class = case job_type when :once opts[:_t] ||= Rufus::Scheduler.parse(t, opts) opts[:_t].is_a?(Numeric) ? InJob : AtJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) job.check_frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 604 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 539 def join_shutdown(opts) limit = opts[:wait] || opts[:join] limit = limit.is_a?(Numeric) ? limit : nil #@started_at = nil # # when @started_at is nil, the scheduler thread exits, here # we want it to exit when all the work threads have been joined # hence it's set to nil later on # @paused_at = EoTime.now (work_threads.size * 2 + 1).times { @work_queue << :shutdown } work_threads .collect { |wt| wt == Thread.current ? nil : Thread.new { wt.join(limit); wt.kill } } .each { |st| st.join if st } @started_at = nil end
# File lib/rufus/scheduler.rb, line 563 def kill_shutdown(opts) @started_at = nil work_threads.each(&:kill) end
# File lib/rufus/scheduler.rb, line 721 def ltstamp; self.class.ltstamp; end
# File lib/rufus/scheduler.rb, line 720 def monow; self.class.monow; end
# File lib/rufus/scheduler.rb, line 597 def no_time_limit_join @join_queue.pop end
# File lib/rufus/scheduler.rb, line 569 def regular_shutdown(opts) @started_at = nil end
# File lib/rufus/scheduler.rb, line 683 def rejoin (@join_queue.num_waiting * 2 + 1).times { @join_queue << @thread } end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 625 def start @started_at = EoTime.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused_at timeout_jobs sleep(@frequency) end rejoin end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 613 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 574 def time_limit_join(limit) fail ArgumentError.new("limit #{limit.inspect} should be > 0") \ unless limit.is_a?(Numeric) && limit > 0 t0 = monow f = [ limit.to_f / 20, 0.100 ].min while monow - t0 < limit r = begin @join_queue.pop(true) rescue ThreadError => e # #<ThreadError: queue empty> false end return r if r sleep(f) end nil end
# File lib/rufus/scheduler.rb, line 664 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] ts = t[:rufus_scheduler_time] next unless job && to && ts # thread might just have become inactive (job -> nil) to = ts + to unless to.is_a?(EoTime) next if to > EoTime.now t.raise(Rufus::Scheduler::TimeoutError) end end
# File lib/rufus/scheduler.rb, line 654 def trigger_jobs now = EoTime.now @jobs.each(now) do |job| job.trigger(now) end end
# File lib/rufus/scheduler.rb, line 649 def unschedule_jobs @jobs.delete_unscheduled end