module Delayed::RecurringJob

Public Class Methods

included(base) click to toggle source
# File lib/delayed/recurring_job.rb, line 8
def self.included(base)
  base.extend(ClassMethods)
  base.class_eval do
    @@logger = Delayed::Worker.logger
    cattr_reader :logger
  end
end

Public Instance Methods

failure() click to toggle source
# File lib/delayed/recurring_job.rb, line 16
def failure
  schedule!
end
next_run_time() click to toggle source
# File lib/delayed/recurring_job.rb, line 57
def next_run_time
  times = @schedule_options[:run_at]
  times = [times] unless times.is_a? Array
  times = times.map{|time| parse_time(time, @schedule_options[:timezone])}
  times = times.map{|time| time.in_time_zone @schedule_options[:timezone]} if @schedule_options[:timezone]

  interval = deserialize_duration(@schedule_options[:run_interval])

  until next_time = next_future_time(times)
    times.map!{ |time| time + interval }
  end

  # Update @schedule_options to avoid growing number of calculations each time
  @schedule_options[:run_at] = times

  next_time
end
schedule!(options = {}) click to toggle source

Schedule this “repeating” job

# File lib/delayed/recurring_job.rb, line 25
def schedule! options = {}
  options = options.dup.reverse_merge(@schedule_options || {})

  if options[:new_instance] && !options.delete(:reentry)
    return self.class.new.schedule! options.merge(reentry: true)
  end

  if run_every = options.delete(:run_every)
    options[:run_interval] = serialize_duration(run_every)
  end

  @schedule_options = options.reverse_merge(
    run_at: self.class.run_at,
    timezone: self.class.timezone,
    run_interval: serialize_duration(self.class.run_every),
    priority: self.class.priority,
    queue: self.class.queue
  )

  enqueue_opts = { priority: @schedule_options[:priority], run_at: next_run_time }
  enqueue_opts[:queue] = @schedule_options[:queue] if @schedule_options[:queue]

  Delayed::Job.transaction do
    self.class.jobs(@schedule_options).destroy_all
    if Gem.loaded_specs['delayed_job'].version.to_s.first.to_i < 3
      Delayed::Job.enqueue self, enqueue_opts[:priority], enqueue_opts[:run_at]
    else
      Delayed::Job.enqueue self, enqueue_opts
    end
  end
end
success() click to toggle source
# File lib/delayed/recurring_job.rb, line 20
def success
  schedule!
end

Private Instance Methods

deserialize_duration(serialized) click to toggle source
# File lib/delayed/recurring_job.rb, line 87
def deserialize_duration(serialized)
  case serialized
  when Hash
    ActiveSupport::Duration.new(serialized[:value], serialized[:parts])
  else
    serialized
  end
end
get_timezone(zone) click to toggle source
# File lib/delayed/recurring_job.rb, line 108
def get_timezone(zone)
  if zone
    ActiveSupport::TimeZone.new(zone)
  else
    Time.zone
  end
end
next_future_time(times) click to toggle source
# File lib/delayed/recurring_job.rb, line 116
def next_future_time(times)
  times.select{|time| time > Time.now}.min
end
parse_time(time, timezone) click to toggle source
# File lib/delayed/recurring_job.rb, line 96
def parse_time(time, timezone)
  case time
  when String
    time_with_zone = get_timezone(timezone).parse(time)
    parts = Date._parse(time, false)
    wday = parts.fetch(:wday, time_with_zone.wday)
    time_with_zone + (wday - time_with_zone.wday).days
  else
    time
  end
end
serialize_duration(duration) click to toggle source

We don't want the run_interval to be serialized as a number of seconds. 1.day is not the same as 86400 (not all days are 86400 seconds long!)

# File lib/delayed/recurring_job.rb, line 78
def serialize_duration(duration)
  case duration
  when ActiveSupport::Duration
    {value: duration.value, parts: duration.parts}
  else
    duration
  end
end