class Rufus::Scheduler

Constants

EoTime
MAX_WORK_THREADS

MIN_WORK_THREADS = 3

VERSION

Attributes

discard_past[RW]
frequency[RW]
max_work_threads[RW]

attr_accessor :min_work_threads

mutexes[R]
paused_at[R]
started_at[R]
stderr[RW]
thread[R]
thread_key[R]
work_queue[R]

Public Class Methods

h_to_s(t=Time.now) click to toggle source

Produces a hour/min/sec/milli string representation of Time instance

# File lib/rufus/scheduler/util.rb, line 187
def h_to_s(t=Time.now)
  t.strftime('%T.%6N')
end
ltstamp() click to toggle source
# File lib/rufus/scheduler/util.rb, line 206
def ltstamp; Time.now.strftime('%FT%T.%3N'); end
monow() click to toggle source
# File lib/rufus/scheduler/util.rb, line 201
def monow; Process.clock_gettime(Process::CLOCK_MONOTONIC); end
new(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 58
def initialize(opts={})

  @opts = opts

  @started_at = nil
  @paused_at = nil

  @jobs = JobArray.new

  @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300)
  @discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true

  @mutexes = {}

  @work_queue = Queue.new
  @join_queue = Queue.new

  #@min_work_threads =
  #  opts[:min_work_threads] || opts[:min_worker_threads] ||
  #  MIN_WORK_THREADS
  @max_work_threads =
    opts[:max_work_threads] || opts[:max_worker_threads] ||
    MAX_WORK_THREADS

  @stderr = $stderr

  @thread_key = "rufus_scheduler_#{self.object_id}"

  @scheduler_lock =
    if lockfile = opts[:lockfile]
      Rufus::Scheduler::FileLock.new(lockfile)
    else
      opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new
    end

  @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new

  # If we can't grab the @scheduler_lock, don't run.
  lock || return

  start
end
parse(o, opts={}) click to toggle source
# File lib/rufus/scheduler/util.rb, line 10
def parse(o, opts={})

  opts[:no_error] = true

  parse_cron(o, opts) ||
  parse_in(o, opts) || # covers 'every' schedule strings
  parse_at(o, opts) ||
  fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})"))
end
parse_at(o, opts={}) click to toggle source
# File lib/rufus/scheduler/util.rb, line 42
def parse_at(o, opts={})

  return o if o.is_a?(EoTime)
  return EoTime.make(o) if o.is_a?(Time)
  EoTime.parse(o, opts)

rescue StandardError => se

  return nil if opts[:no_error]
  fail se
end
parse_cron(o, opts={}) click to toggle source
# File lib/rufus/scheduler/util.rb, line 20
def parse_cron(o, opts={})

  opts[:no_error] ?
    Fugit.parse_cron(o) :
    Fugit.do_parse_cron(o)
end
parse_duration(str, opts={}) click to toggle source

Turns a string like '1m10s' into a float like '70.0', more formally, turns a time duration expressed as a string into a Float instance (millisecond count).

w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year 'nada' -> millisecond

Some examples:

Rufus::Scheduler.parse_duration "0.5"    # => 0.5
Rufus::Scheduler.parse_duration "500"    # => 0.5
Rufus::Scheduler.parse_duration "1000"   # => 1.0
Rufus::Scheduler.parse_duration "1h"     # => 3600.0
Rufus::Scheduler.parse_duration "1h10s"  # => 3610.0
Rufus::Scheduler.parse_duration "1w2d"   # => 777600.0

Negative time strings are OK (Thanks Danny Fullerton):

Rufus::Scheduler.parse_duration "-0.5"   # => -0.5
Rufus::Scheduler.parse_duration "-1h"    # => -3600.0
# File lib/rufus/scheduler/util.rb, line 81
def parse_duration(str, opts={})

  d =
    opts[:no_error] ?
    Fugit::Duration.parse(str, opts) :
    Fugit::Duration.do_parse(str, opts)
  d ?
    d.to_sec :
    nil
end
parse_in(o, opts={}) click to toggle source
# File lib/rufus/scheduler/util.rb, line 27
def parse_in(o, opts={})

  #o.is_a?(String) ? parse_duration(o, opts) : o

  return parse_duration(o, opts) if o.is_a?(String)
  return o if o.is_a?(Numeric)

  fail ArgumentError.new("couldn't parse time point in #{o.inspect}")

rescue ArgumentError => ae

  return nil if opts[:no_error]
  fail ae
end
s(opts={}) click to toggle source

Alias for Rufus::Scheduler.singleton

# File lib/rufus/scheduler.rb, line 110
def self.s(opts={}); singleton(opts); end
singleton(opts={}) click to toggle source

Returns a singleton Rufus::Scheduler instance

# File lib/rufus/scheduler.rb, line 103
def self.singleton(opts={})

  @singleton ||= Rufus::Scheduler.new(opts)
end
start_new() click to toggle source

Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.

For now, let's assume the people pointing at rufus-scheduler/master on GitHub know what they do…

# File lib/rufus/scheduler.rb, line 118
def self.start_new

  fail 'this is rufus-scheduler 3.x, use .new instead of .start_new'
end
to_duration(seconds, options={}) click to toggle source

Turns a number of seconds into a a time string

Rufus.to_duration 0                    # => '0s'
Rufus.to_duration 60                   # => '1m'
Rufus.to_duration 3661                 # => '1h1m1s'
Rufus.to_duration 7 * 24 * 3600        # => '1w'
Rufus.to_duration 30 * 24 * 3600 + 1   # => "4w2d1s"

It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.

For 30 days months to be counted, the second parameter of this method can be set to true.

Rufus.to_duration 30 * 24 * 3600 + 1, true   # => "1M1s"

If a Float value is passed, milliseconds will be displayed without 'marker'

Rufus.to_duration 0.051                       # => "51"
Rufus.to_duration 7.051                       # => "7s51"
Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1  # => "4w2d1s120"

(this behaviour mirrors the one found for parse_time_string()).

Options are :

  • :months, if set to true, months (M) of 30 days will be taken into account when building up the result

  • :drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result

# File lib/rufus/scheduler/util.rb, line 124
def to_duration(seconds, options={})

  #d = Fugit::Duration.parse(seconds, options).deflate
  #d = d.drop_seconds if options[:drop_seconds]
  #d = d.deflate(:month => options[:months]) if options[:months]
  #d.to_rufus_s

  to_fugit_duration(seconds, options).to_rufus_s
end
to_duration_hash(seconds, options={}) click to toggle source

Turns a number of seconds (integer or Float) into a hash like in :

Rufus.to_duration_hash 0.051
  # => { :s => 0.051 }
Rufus.to_duration_hash 7.051
  # => { :s => 7.051 }
Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1
  # => { :w => 4, :d => 2, :s => 1.120 }

This method is used by to_duration behind the scenes.

Options are :

  • :months, if set to true, months (M) of 30 days will be taken into account when building up the result

  • :drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result

# File lib/rufus/scheduler/util.rb, line 152
def to_duration_hash(seconds, options={})

  to_fugit_duration(seconds, options).to_rufus_h
end
to_fugit_duration(seconds, options={}) click to toggle source

Used by both .to_duration and .to_duration_hash

# File lib/rufus/scheduler/util.rb, line 159
def to_fugit_duration(seconds, options={})

  d = Fugit::Duration
    .parse(seconds, options)
    .deflate

  d = d.drop_seconds if options[:drop_seconds]
  d = d.deflate(:month => options[:months]) if options[:months]

  d
end
utc_to_s(t=Time.now) click to toggle source

Produces the UTC string representation of a Time instance

like “2009/11/23 11:11:50.947109 UTC”

# File lib/rufus/scheduler/util.rb, line 181
def utc_to_s(t=Time.now)
  "#{t.dup.utc.strftime('%F %T.%6N')} UTC"
end

Public Instance Methods

around_trigger(job) { || ... } click to toggle source
# File lib/rufus/scheduler.rb, line 128
def around_trigger(job)

  yield
end
at(time, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 184
def at(time, callable=nil, opts={}, &block)

  do_schedule(:once, time, callable, opts, opts[:job], block)
end
at_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 295
def at_jobs(opts={})

  jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) }
end
confirm_lock() click to toggle source

Callback called when a job is triggered. If the lock cannot be acquired, the job won't run (though it'll still be scheduled to run again if necessary).

# File lib/rufus/scheduler.rb, line 363
def confirm_lock

  @trigger_lock.lock
end
cron(cronline, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 224
def cron(cronline, callable=nil, opts={}, &block)

  do_schedule(:cron, cronline, callable, opts, opts[:job], block)
end
cron_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 315
def cron_jobs(opts={})

  jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) }
end
down?() click to toggle source
# File lib/rufus/scheduler.rb, line 152
def down?

  ! @started_at
end
every(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 204
def every(duration, callable=nil, opts={}, &block)

  do_schedule(:every, duration, callable, opts, opts[:job], block)
end
every_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 305
def every_jobs(opts={})

  jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) }
end
in(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 194
def in(duration, callable=nil, opts={}, &block)

  do_schedule(:once, duration, callable, opts, opts[:job], block)
end
in_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 300
def in_jobs(opts={})

  jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) }
end
interval(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 214
def interval(duration, callable=nil, opts={}, &block)

  do_schedule(:interval, duration, callable, opts, opts[:job], block)
end
interval_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 310
def interval_jobs(opts={})

  jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) }
end
job(job_id) click to toggle source
# File lib/rufus/scheduler.rb, line 320
def job(job_id)

  @jobs[job_id]
end
jobs(opts={}) click to toggle source

Returns all the scheduled jobs (even those right before re-schedule).

# File lib/rufus/scheduler.rb, line 277
def jobs(opts={})

  opts = { opts => true } if opts.is_a?(Symbol)

  jobs = @jobs.to_a

  if opts[:running]
    jobs = jobs.select { |j| j.running? }
  elsif ! opts[:all]
    jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at }
  end

  tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s)
  jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } }

  jobs
end
join(time_limit=nil) click to toggle source
# File lib/rufus/scheduler.rb, line 138
def join(time_limit=nil)

  fail NotRunningError.new('cannot join scheduler that is not running') \
    unless @thread
  fail ThreadError.new('scheduler thread cannot join itself') \
    if @thread == Thread.current

  if time_limit
    time_limit_join(time_limit)
  else
    no_time_limit_join
  end
end
lock() click to toggle source

Returns true if the scheduler has acquired the [exclusive] lock and thus may run.

Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.

Out of the box, rufus-scheduler proposes the :lockfile => 'path/to/lock/file' scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won't work reliably on distributed file systems.

If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock and unlock, and both of these methods should be idempotent.

Look at rufus/scheduler/locks.rb for an example.

# File lib/rufus/scheduler.rb, line 346
def lock

  @scheduler_lock.lock
end
occurrences(time0, time1, format=:per_job) click to toggle source
# File lib/rufus/scheduler.rb, line 416
def occurrences(time0, time1, format=:per_job)

  h = {}

  jobs.each do |j|
    os = j.occurrences(time0, time1)
    h[j] = os if os.any?
  end

  if format == :timeline
    a = []
    h.each { |j, ts| ts.each { |t| a << [ t, j ] } }
    a.sort_by { |(t, _)| t }
  else
    h
  end
end
on_error(job, err) click to toggle source
# File lib/rufus/scheduler.rb, line 439
def on_error(job, err)

  pre = err.object_id.to_s

  ms = {}; mutexes.each { |k, v| ms[k] = v.locked? }

  stderr.puts("{ #{pre} rufus-scheduler intercepted an error:")
  stderr.puts("  #{pre}   job:")
  stderr.puts("  #{pre}     #{job.class} #{job.original.inspect} #{job.opts.inspect}")
  stderr.puts("  #{pre}     #{job.source_location.inspect}")
  # TODO: eventually use a Job#detail or something like that
  stderr.puts("  #{pre}   error:")
  stderr.puts("  #{pre}     #{err.object_id}")
  stderr.puts("  #{pre}     #{err.class}")
  stderr.puts("  #{pre}     #{err}")
  err.backtrace.each do |l|
    stderr.puts("  #{pre}       #{l}")
  end
  stderr.puts("  #{pre}   tz:")
  stderr.puts("  #{pre}     ENV['TZ']: #{ENV['TZ']}")
  stderr.puts("  #{pre}     Time.now: #{Time.now}")
  stderr.puts("  #{pre}     local_tzone: #{EoTime.local_tzone.inspect}")
  stderr.puts("  #{pre}   et-orbi:")
  stderr.puts("  #{pre}     #{EoTime.platform_info}")
  stderr.puts("  #{pre}   scheduler:")
  stderr.puts("  #{pre}     object_id: #{object_id}")
  stderr.puts("  #{pre}     opts:")
  stderr.puts("  #{pre}       #{@opts.inspect}")
  stderr.puts("  #{pre}       frequency: #{self.frequency}")
  stderr.puts("  #{pre}       scheduler_lock: #{@scheduler_lock.inspect}")
  stderr.puts("  #{pre}       trigger_lock: #{@trigger_lock.inspect}")
  stderr.puts("  #{pre}     uptime: #{uptime} (#{uptime_s})")
  stderr.puts("  #{pre}     down?: #{down?}")
  stderr.puts("  #{pre}     frequency: #{frequency.inspect}")
  stderr.puts("  #{pre}     discard_past: #{discard_past.inspect}")
  stderr.puts("  #{pre}     started_at: #{started_at.inspect}")
  stderr.puts("  #{pre}     paused_at: #{paused_at.inspect}")
  stderr.puts("  #{pre}     threads: #{self.threads.size}")
  stderr.puts("  #{pre}       thread: #{self.thread}")
  stderr.puts("  #{pre}       thread_key: #{self.thread_key}")
  stderr.puts("  #{pre}       work_threads: #{work_threads.size}")
  stderr.puts("  #{pre}         active: #{work_threads(:active).size}")
  stderr.puts("  #{pre}         vacant: #{work_threads(:vacant).size}")
  stderr.puts("  #{pre}         max_work_threads: #{max_work_threads}")
  stderr.puts("  #{pre}       mutexes: #{ms.inspect}")
  stderr.puts("  #{pre}     jobs: #{jobs.size}")
  stderr.puts("  #{pre}       at_jobs: #{at_jobs.size}")
  stderr.puts("  #{pre}       in_jobs: #{in_jobs.size}")
  stderr.puts("  #{pre}       every_jobs: #{every_jobs.size}")
  stderr.puts("  #{pre}       interval_jobs: #{interval_jobs.size}")
  stderr.puts("  #{pre}       cron_jobs: #{cron_jobs.size}")
  stderr.puts("  #{pre}     running_jobs: #{running_jobs.size}")
  stderr.puts("  #{pre}     work_queue:")
  stderr.puts("  #{pre}       size: #{@work_queue.size}")
  stderr.puts("  #{pre}       num_waiting: #{@work_queue.num_waiting}")
  stderr.puts("  #{pre}     join_queue:")
  stderr.puts("  #{pre}       size: #{@join_queue.size}")
  stderr.puts("  #{pre}       num_waiting: #{@join_queue.num_waiting}")
  stderr.puts("} #{pre} .")

rescue => e

  stderr.puts("failure in #on_error itself:")
  stderr.puts(e.inspect)
  stderr.puts(e.backtrace)

ensure

  stderr.flush
end
pause() click to toggle source
# File lib/rufus/scheduler.rb, line 167
def pause

  @paused_at = EoTime.now
end
paused?() click to toggle source
# File lib/rufus/scheduler.rb, line 162
def paused?

  !! @paused_at
end
repeat(arg, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 248
def repeat(arg, callable=nil, opts={}, &block)

  callable, opts = nil, callable if callable.is_a?(Hash)
  opts = opts.dup

  opts[:_t] = Rufus::Scheduler.parse(arg, opts)

  case opts[:_t]
  when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
  else schedule_every(arg, callable, opts, &block)
  end
end
resume(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 172
def resume(opts={})

  dp = opts[:discard_past]
  jobs.each { |job| job.resume_discard_past = dp }

  @paused_at = nil
end
running_jobs(opts={}) click to toggle source
# File lib/rufus/scheduler.rb, line 411
def running_jobs(opts={})

  jobs(opts.merge(:running => true))
end
schedule(arg, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 234
def schedule(arg, callable=nil, opts={}, &block)

  callable, opts = nil, callable if callable.is_a?(Hash)
  opts = opts.dup

  opts[:_t] = Rufus::Scheduler.parse(arg, opts)

  case opts[:_t]
  when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
  when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block)
  else schedule_in(arg, callable, opts, &block)
  end
end
schedule_at(time, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 189
def schedule_at(time, callable=nil, opts={}, &block)

  do_schedule(:once, time, callable, opts, true, block)
end
schedule_cron(cronline, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 229
def schedule_cron(cronline, callable=nil, opts={}, &block)

  do_schedule(:cron, cronline, callable, opts, true, block)
end
schedule_every(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 209
def schedule_every(duration, callable=nil, opts={}, &block)

  do_schedule(:every, duration, callable, opts, true, block)
end
schedule_in(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 199
def schedule_in(duration, callable=nil, opts={}, &block)

  do_schedule(:once, duration, callable, opts, true, block)
end
schedule_interval(duration, callable=nil, opts={}, &block) click to toggle source
# File lib/rufus/scheduler.rb, line 219
def schedule_interval(duration, callable=nil, opts={}, &block)

  do_schedule(:interval, duration, callable, opts, true, block)
end
scheduled?(job_or_job_id) click to toggle source

Returns true if this job is currently scheduled.

Takes extra care to answer true if the job is a repeat job currently firing.

# File lib/rufus/scheduler.rb, line 373
def scheduled?(job_or_job_id)

  job, _ = fetch(job_or_job_id)

  !! (job && job.unscheduled_at.nil? && job.next_time != nil)
end
shutdown(opt=nil) click to toggle source
# File lib/rufus/scheduler.rb, line 510
def shutdown(opt=nil)

  opts =
    case opt
    when Symbol then { opt => true }
    when Hash then opt
    else {}
    end

  @jobs.unschedule_all

  if opts[:wait] || opts[:join]
    join_shutdown(opts)
  elsif opts[:kill]
    kill_shutdown(opts)
  else
    regular_shutdown(opts)
  end

  @work_queue.clear

  unlock

  @thread.join
end
Also aliased as: stop
stop(opt=nil)
Alias for: shutdown
threads() click to toggle source

Lists all the threads associated with this scheduler.

# File lib/rufus/scheduler.rb, line 382
def threads

  Thread.list.select { |t| t[thread_key] }
end
timeline(time0, time1) click to toggle source
# File lib/rufus/scheduler.rb, line 434
def timeline(time0, time1)

  occurrences(time0, time1, :timeline)
end
unlock() click to toggle source

Sister method to lock, is called when the scheduler shuts down.

# File lib/rufus/scheduler.rb, line 353
def unlock

  @trigger_lock.unlock
  @scheduler_lock.unlock
end
unschedule(job_or_job_id) click to toggle source
# File lib/rufus/scheduler.rb, line 261
def unschedule(job_or_job_id)

  job, job_id = fetch(job_or_job_id)

  fail ArgumentError.new("no job found with id '#{job_id}'") unless job

  job.unschedule if job
end
up?() click to toggle source
# File lib/rufus/scheduler.rb, line 157
def up?

  !! @started_at
end
uptime() click to toggle source
# File lib/rufus/scheduler.rb, line 123
def uptime

  @started_at ? EoTime.now - @started_at : nil
end
uptime_s() click to toggle source
# File lib/rufus/scheduler.rb, line 133
def uptime_s

  uptime ? self.class.to_duration(uptime) : ''
end
work_threads(query=:all) click to toggle source

Lists all the work threads (the ones actually running the scheduled block code)

Accepts a query option, which can be set to:

  • :all (default), returns all the threads that are work threads or are currently running a job

  • :active, returns all threads that are currently running a job

  • :vacant, returns the threads that are not running a job

If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.

# File lib/rufus/scheduler.rb, line 400
def work_threads(query=:all)

  ts = threads.select { |t| t[:rufus_scheduler_work_thread] }

  case query
  when :active then ts.select { |t| t[:rufus_scheduler_job] }
  when :vacant then ts.reject { |t| t[:rufus_scheduler_job] }
  else ts
  end
end

Protected Instance Methods

do_schedule(job_type, t, callable, opts, return_job_instance, block) click to toggle source
# File lib/rufus/scheduler.rb, line 688
def do_schedule(job_type, t, callable, opts, return_job_instance, block)

  fail NotRunningError.new(
    'cannot schedule, scheduler is down or shutting down'
  ) if @started_at.nil?

  callable, opts = nil, callable if callable.is_a?(Hash)
  opts = opts.dup unless opts.has_key?(:_t)

  return_job_instance ||= opts[:job]

  job_class =
    case job_type
    when :once
      opts[:_t] ||= Rufus::Scheduler.parse(t, opts)
      opts[:_t].is_a?(Numeric) ? InJob : AtJob
    when :every
      EveryJob
    when :interval
      IntervalJob
    when :cron
      CronJob
    end

  job = job_class.new(self, t, opts, block || callable)
  job.check_frequency

  @jobs.push(job)

  return_job_instance ? job : job.job_id
end
fetch(job_or_job_id) click to toggle source

Returns [ job, job_id ]

# File lib/rufus/scheduler.rb, line 604
def fetch(job_or_job_id)

  if job_or_job_id.respond_to?(:job_id)
    [ job_or_job_id, job_or_job_id.job_id ]
  else
    [ job(job_or_job_id), job_or_job_id ]
  end
end
join_shutdown(opts) click to toggle source
# File lib/rufus/scheduler.rb, line 539
def join_shutdown(opts)

  limit = opts[:wait] || opts[:join]
  limit = limit.is_a?(Numeric) ? limit : nil

  #@started_at = nil
    #
    # when @started_at is nil, the scheduler thread exits, here
    # we want it to exit when all the work threads have been joined
    # hence it's set to nil later on
    #
  @paused_at = EoTime.now

  (work_threads.size * 2 + 1).times { @work_queue << :shutdown }

  work_threads
    .collect { |wt|
      wt == Thread.current ? nil : Thread.new { wt.join(limit); wt.kill } }
    .each { |st|
      st.join if st }

  @started_at = nil
end
kill_shutdown(opts) click to toggle source
# File lib/rufus/scheduler.rb, line 563
def kill_shutdown(opts)

  @started_at = nil
  work_threads.each(&:kill)
end
ltstamp() click to toggle source
# File lib/rufus/scheduler.rb, line 721
def ltstamp; self.class.ltstamp; end
monow() click to toggle source
# File lib/rufus/scheduler.rb, line 720
def monow; self.class.monow; end
no_time_limit_join() click to toggle source
# File lib/rufus/scheduler.rb, line 597
def no_time_limit_join

  @join_queue.pop
end
regular_shutdown(opts) click to toggle source
# File lib/rufus/scheduler.rb, line 569
def regular_shutdown(opts)

  @started_at = nil
end
rejoin() click to toggle source
# File lib/rufus/scheduler.rb, line 683
def rejoin

  (@join_queue.num_waiting * 2 + 1).times { @join_queue << @thread }
end
start() click to toggle source

def free_all_work_threads

work_threads.each { |t| t.raise(KillSignal) }

end

# File lib/rufus/scheduler.rb, line 625
def start

  @started_at = EoTime.now

  @thread =
    Thread.new do

      while @started_at do

        unschedule_jobs
        trigger_jobs unless @paused_at
        timeout_jobs

        sleep(@frequency)
      end

      rejoin
    end

  @thread[@thread_key] = true
  @thread[:rufus_scheduler] = self
  @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"
end
terminate_all_jobs() click to toggle source
# File lib/rufus/scheduler.rb, line 613
def terminate_all_jobs

  jobs.each { |j| j.unschedule }

  sleep 0.01 while running_jobs.size > 0
end
time_limit_join(limit) click to toggle source
# File lib/rufus/scheduler.rb, line 574
def time_limit_join(limit)

  fail ArgumentError.new("limit #{limit.inspect} should be > 0") \
    unless limit.is_a?(Numeric) && limit > 0

  t0 = monow
  f = [ limit.to_f / 20, 0.100 ].min

  while monow - t0 < limit
    r =
      begin
        @join_queue.pop(true)
      rescue ThreadError => e
        # #<ThreadError: queue empty>
        false
      end
    return r if r
    sleep(f)
  end

  nil
end
timeout_jobs() click to toggle source
# File lib/rufus/scheduler.rb, line 664
def timeout_jobs

  work_threads(:active).each do |t|

    job = t[:rufus_scheduler_job]
    to = t[:rufus_scheduler_timeout]
    ts = t[:rufus_scheduler_time]

    next unless job && to && ts
      # thread might just have become inactive (job -> nil)

    to = ts + to unless to.is_a?(EoTime)

    next if to > EoTime.now

    t.raise(Rufus::Scheduler::TimeoutError)
  end
end
trigger_jobs() click to toggle source
# File lib/rufus/scheduler.rb, line 654
def trigger_jobs

  now = EoTime.now

  @jobs.each(now) do |job|

    job.trigger(now)
  end
end
unschedule_jobs() click to toggle source
# File lib/rufus/scheduler.rb, line 649
def unschedule_jobs

  @jobs.delete_unscheduled
end