class Rufus::Scheduler::Job
Constants
- EoTime
Attributes
callable[R]
anything with a call
(job[, timet]) method, what gets actually triggered
count[R]
handler[R]
a reference to the instance whose call method is the @callable
id[R]
job_id[R]
last_time[R]
last_work_time[R]
locals[R]
mean_work_time[R]
name[RW]
next_time[RW]
next trigger time
opts[R]
original[R]
previous_time[RW]
previous “next trigger time”
scheduled_at[R]
unscheduled_at[R]
Public Class Methods
new(scheduler, original, opts, block)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 46 def initialize(scheduler, original, opts, block) @scheduler = scheduler @original = original @opts = opts @handler = block @callable = if block.respond_to?(:arity) block elsif block.respond_to?(:call) block.method(:call) elsif block.is_a?(Class) @handler = block.new @handler.method(:call) rescue nil else nil end @scheduled_at = EoTime.now @unscheduled_at = nil @last_time = nil @locals = opts[:locals] || opts[:l] || {} @local_mutex = Mutex.new @id = determine_id @name = opts[:name] || opts[:n] fail( ArgumentError, 'missing block or callable to schedule', caller[2..-1] ) unless @callable @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } @count = 0 @last_work_time = 0.0 @mean_work_time = 0.0 # tidy up options if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false @opts[:overlap] = false end if m = @opts[:mutex] @opts[:mutex] = Array(m) end end
Public Instance Methods
[](key)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 164 def [](key) @local_mutex.synchronize { @locals[key] } end
[]=(key, value)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 159 def []=(key, value) @local_mutex.synchronize { @locals[key] = value } end
call(do_rescue=false)
click to toggle source
Calls the callable (usually a block) wrapped in this Job
instance.
Warning: error rescueing is the responsibity of the caller.
# File lib/rufus/scheduler/jobs_core.rb, line 197 def call(do_rescue=false) do_call(EoTime.now, do_rescue) end
check_frequency()
click to toggle source
Will fail with an ArgumentError if the job frequency is higher than the scheduler frequency.
# File lib/rufus/scheduler/jobs_core.rb, line 109 def check_frequency # this parent implementation never fails end
entries()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 177 def entries; @local_mutex.synchronize { @locals.entries }; end
has_key?(key)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 169 def has_key?(key) @local_mutex.synchronize { @locals.has_key?(key) } end
Also aliased as: key?
keys()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 175 def keys; @local_mutex.synchronize { @locals.keys }; end
kill()
click to toggle source
Kills all the threads this Job
currently has going on.
# File lib/rufus/scheduler/jobs_core.rb, line 144 def kill threads.each { |t| t.raise(KillSignal) } end
next_times(count)
click to toggle source
def hash
self.object_id
end def eql?(o)
o.class == self.class && o.hash == self.hash
end
might be necessary at some point
# File lib/rufus/scheduler/jobs_core.rb, line 188 def next_times(count) next_time ? [ next_time ] : [] end
resume_discard_past=(v)
click to toggle source
Default, core, implementation has no effect. Repeat jobs do override it.
# File lib/rufus/scheduler/jobs_core.rb, line 44 def resume_discard_past=(v); end
running?()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 149 def running? threads.any? end
scheduled?()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 154 def scheduled? @scheduler.scheduled?(self) end
source_location()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 100 def source_location @callable.source_location end
Also aliased as: location
threads()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 137 def threads Thread.list.select { |t| t[:rufus_scheduler_job] == self } end
trigger(time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 114 def trigger(time) @previous_time = @next_time set_next_time(time) do_trigger(time) end
trigger_off_schedule(time=EoTime.now)
click to toggle source
Trigger the job right now, off of its schedule.
Done in collaboration with Piavka in github.com/jmettraux/rufus-scheduler/issues/214
# File lib/rufus/scheduler/jobs_core.rb, line 127 def trigger_off_schedule(time=EoTime.now) do_trigger(time) end
unschedule()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 132 def unschedule @unscheduled_at = EoTime.now end
values()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 176 def values; @local_mutex.synchronize { @locals.values }; end
Protected Instance Methods
callback(meth, time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 204 def callback(meth, time) return true unless @scheduler.respond_to?(meth) arity = @scheduler.method(meth).arity args = [ self, time ][0, (arity < 0 ? 2 : arity)] @scheduler.send(meth, *args) end
compute_timeout()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 214 def compute_timeout if to = @opts[:timeout] Rufus::Scheduler.parse(to) else nil end end
do_call(time, do_rescue)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 228 def do_call(time, do_rescue) args = [ self, time ][0, @callable.arity] @scheduler.around_trigger(self) do @callable.call(*args) end rescue StandardError => se fail se unless do_rescue return if se.is_a?(KillSignal) # discard @scheduler.on_error(self, se) # exceptions above StandardError do pass through end
do_trigger(time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 247 def do_trigger(time) return if ( opts[:overlap] == false && running? ) return if ( callback(:confirm_lock, time) && callback(:on_pre_trigger, time) ) == false @count += 1 if opts[:blocking] trigger_now(time) else trigger_queue(time) end end
mutex(m)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 223 def mutex(m) m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new) end
post_trigger(time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 296 def post_trigger(time) set_next_time(time, true) # except IntervalJob instances, jobs will ignore this call callback(:on_post_trigger, time) end
start_work_thread()
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 304 def start_work_thread thread = Thread.new do ct = Thread.current ct[:rufus_scheduler_job] = true # indicates that the thread is going to be assigned immediately ct[@scheduler.thread_key] = true ct[:rufus_scheduler_work_thread] = true loop do break if @scheduler.started_at == nil job, time = @scheduler.work_queue.pop break if job == :shutdown break if @scheduler.started_at == nil next if job.unscheduled_at begin (job.opts[:mutex] || []).reduce( lambda { job.trigger_now(time) } ) do |b, m| lambda { mutex(m).synchronize { b.call } } end.call rescue KillSignal # simply go on looping end end end thread[@scheduler.thread_key] = true thread[:rufus_scheduler_work_thread] = true # # same as above (in the thead block), # but since it has to be done as quickly as possible. # So, whoever is running first (scheduler thread vs job thread) # sets this information thread end
trigger_now(time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 267 def trigger_now(time) ct = Thread.current t = EoTime.now # if there are mutexes, t might be really bigger than time ct[:rufus_scheduler_job] = self ct[:rufus_scheduler_time] = t ct[:rufus_scheduler_timeout] = compute_timeout @last_time = t do_call(time, true) ensure @last_work_time = EoTime.now - ct[:rufus_scheduler_time] @mean_work_time = ((@count - 1) * @mean_work_time + @last_work_time) / @count post_trigger(time) ct[:rufus_scheduler_job] = nil ct[:rufus_scheduler_time] = nil ct[:rufus_scheduler_timeout] = nil end
trigger_queue(time)
click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 354 def trigger_queue(time) threads = @scheduler.work_threads vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size que = @scheduler.work_queue.size cur = threads.size max = @scheduler.max_work_threads start_work_thread if vac - que < 1 && cur < max @scheduler.work_queue << [ self, time ] end