class Skiplock::Job

Attributes

activejob_error[RW]

Public Class Methods

dispatch(purge_completion: true, max_retries: 20) click to toggle source
# File lib/skiplock/job.rb, line 7
def self.dispatch(purge_completion: true, max_retries: 20)
  job = nil
  self.connection.transaction do
    job = self.find_by_sql("SELECT id, scheduled_at FROM skiplock.jobs WHERE running = FALSE AND expired_at IS NULL AND finished_at IS NULL ORDER BY scheduled_at ASC NULLS FIRST, priority ASC NULLS LAST, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").first
    return if job.nil? || job.scheduled_at.to_f > Time.now.to_f
    job = self.find_by_sql("UPDATE skiplock.jobs SET running = TRUE, updated_at = NOW() WHERE id = '#{job.id}' RETURNING *").first
  end
  self.dispatch(purge_completion: purge_completion, max_retries: max_retries) if job.execute(purge_completion: purge_completion, max_retries: max_retries)
end
enqueue(activejob) click to toggle source
# File lib/skiplock/job.rb, line 17
def self.enqueue(activejob)
  self.enqueue_at(activejob, nil)
end
enqueue_at(activejob, timestamp) click to toggle source
# File lib/skiplock/job.rb, line 21
def self.enqueue_at(activejob, timestamp)
  options = activejob.instance_variable_get('@skiplock_options') || {}
  timestamp = Time.at(timestamp) if timestamp
  if Thread.current[:skiplock_job].try(:id) == activejob.job_id
    Thread.current[:skiplock_job].activejob_error = options[:error]
    Thread.current[:skiplock_job].data['activejob_error'] = true
    Thread.current[:skiplock_job].executions = activejob.executions
    Thread.current[:skiplock_job].exception_executions = activejob.exception_executions
    Thread.current[:skiplock_job].scheduled_at = timestamp
    Thread.current[:skiplock_job]
  else
    serialize = activejob.serialize
    self.create!(serialize.slice(*self.column_names).merge('id' => serialize['job_id'], 'data' => { 'arguments' => serialize['arguments'], 'options' => options }, 'scheduled_at' => timestamp))
  end
end
flush() click to toggle source

resynchronize jobs that could not commit to database and reset any abandoned jobs for retry

# File lib/skiplock/job.rb, line 38
def self.flush
  Dir.mkdir('tmp/skiplock') unless Dir.exist?('tmp/skiplock')
  Dir.glob('tmp/skiplock/*').each do |f|
    disposed = true
    if self.exists?(id: File.basename(f), running: true)
      job = Marshal.load(File.binread(f)) rescue nil
      disposed = job.dispose if job.is_a?(Skiplock::Job)
    end
    (File.delete(f) rescue nil) if disposed
  end
  self.where(running: true).where.not(worker_id: Worker.ids).update_all(running: false, worker_id: nil)
  true
end
reset_retry_schedules() click to toggle source
# File lib/skiplock/job.rb, line 52
def self.reset_retry_schedules
  self.where('scheduled_at > NOW() AND executions > 0 AND expired_at IS NULL AND finished_at IS NULL').update_all(scheduled_at: nil, updated_at: Time.now)
end

Public Instance Methods

dispose() click to toggle source
# File lib/skiplock/job.rb, line 56
def dispose
  return unless @max_retries
  dump = Marshal.dump(self)
  purging = false
  self.running = false
  self.worker_id = nil
  self.updated_at = Time.now > self.updated_at ? Time.now : self.updated_at + 1 # in case of clock drifting
  if @exception
    self.exception_executions["[#{@exception.class.name}]"] = self.exception_executions["[#{@exception.class.name}]"].to_i + 1 unless self.data.key?('activejob_error')
    if (self.executions.to_i >= @max_retries + 1) || self.data.key?('activejob_error') || @exception.is_a?(Skiplock::Extension::ProxyError)
      self.expired_at = Time.now
    else
      self.scheduled_at = Time.now + (5 * 2**self.executions.to_i)
    end
  elsif self.finished_at
    if self.cron
      self.data['cron'] ||= {}
      self.data['cron']['executions'] = self.data['cron']['executions'].to_i + 1
      self.data['cron']['last_finished_at'] = self.finished_at.utc.to_s
      self.data['cron']['last_result'] = self.data['result']
      next_cron_at = Cron.next_schedule_at(self.cron)
      if next_cron_at
        # update job to record completions counter before resetting finished_at to nil
        self.update_columns(self.attributes.slice(*self.changes.keys))
        self.finished_at = nil
        self.executions = nil
        self.exception_executions = nil
        self.data.delete('result')
        self.scheduled_at = Time.at(next_cron_at)
      else
        Skiplock.logger.error("[Skiplock] ERROR: Invalid CRON '#{self.cron}' for Job #{self.job_class}") if Skiplock.logger
        purging = true
      end
    elsif @purge == true
      purging = true
    end
  end
  purging ? self.delete : self.update_columns(self.attributes.slice(*self.changes.keys))
rescue Exception => e
  File.binwrite("tmp/skiplock/#{self.id}", dump) rescue nil
  if Skiplock.logger
    Skiplock.logger.error(e.to_s)
    Skiplock.logger.error(e.backtrace.join("\n"))
  end
  Skiplock.on_errors.each { |p| p.call(e) }
  nil
end
execute(purge_completion: true, max_retries: 20) click to toggle source
# File lib/skiplock/job.rb, line 104
def execute(purge_completion: true, max_retries: 20)
  raise 'Job has already been completed' if self.finished_at
  self.update_columns(running: true, updated_at: Time.now) unless self.running
  Skiplock.logger.info("[Skiplock] Performing #{self.job_class} (#{self.id}) from queue '#{self.queue_name || 'default'}'...") if Skiplock.logger
  self.data ||= {}
  self.data.delete('result')
  self.exception_executions ||= {}
  self.activejob_error = nil
  @max_retries = (self.data['options'].key?('max_retries') ? self.data['options']['max_retries'].to_i : max_retries) rescue max_retries
  @max_retries = 20 if @max_retries < 0 || @max_retries > 20
  @purge = (self.data['options'].key?('purge') ? self.data['options']['purge'] : purge_completion) rescue purge_completion
  job_data = self.attributes.slice('job_class', 'queue_name', 'locale', 'timezone', 'priority', 'executions', 'exception_executions').merge('job_id' => self.id, 'enqueued_at' => self.updated_at, 'arguments' => (self.data['arguments'] || []))
  self.executions = self.executions.to_i + 1
  Thread.current[:skiplock_job] = self
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  begin
    self.data['result'] = ActiveJob::Base.execute(job_data)
  rescue Exception => ex
    @exception = ex
    Skiplock.on_errors.each { |p| p.call(@exception) }
  end
  if Skiplock.logger
    if @exception || self.activejob_error
      Skiplock.logger.error("[Skiplock] Job #{self.job_class} (#{self.id}) was interrupted by an exception#{ ' (rescued and retried by ActiveJob)' if self.activejob_error }")
      if @exception
        Skiplock.logger.error(@exception.to_s)
        Skiplock.logger.error(@exception.backtrace.join("\n"))
      end
    else
      end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      job_name = self.job_class
      if self.job_class == 'Skiplock::Extension::ProxyJob'
        target, method_name = ::YAML.load(self.data['arguments'].first)
        job_name = "'#{target.name}.#{method_name}'"
      end
      Skiplock.logger.info "[Skiplock] Performed #{job_name} (#{self.id}) from queue '#{self.queue_name || 'default'}' in #{end_time - start_time} seconds"
    end
  end
  @exception || self.activejob_error || self.data['result']
ensure
  self.finished_at ||= Time.now if self.data.key?('result') && !self.activejob_error
  self.dispose
end