module Resque::Durable::QueueAuditMethods

Constants

JobCollision

Public Instance Methods

cleanup(date) click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 54
def cleanup(date)
  older_than(date).delete_all
end
complete!() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 114
def complete!
  self.completed_at = Time.now.utc
  save!
end
complete?() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 119
def complete?
  completed_at.present?
end
delay() click to toggle source

1, 8, 27, 64, 125, 216, etc. minutes.

# File lib/resque/durable/queue_audit_methods.rb, line 128
def delay
  (enqueue_count ** 3).minutes
end
duration() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 83
def duration
  job_klass.job_timeout
end
enqueue() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 79
def enqueue
  job_klass.enqueue(*(payload.push(becomes(job_klass.auditor))))
end
enqueued!() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 107
def enqueued!
  self.enqueued_at    = Time.now.utc
  self.timeout_at     = enqueued_at + duration
  self.enqueue_count += 1
  save!
end
fail!() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 103
def fail!
  update_attribute(:timeout_at, Time.now.utc)
end
heartbeat!() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 87
def heartbeat!
  update_attribute(:timeout_at, Time.now.utc + duration)
end
initialize_by_klass_and_args(job_klass, args) click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 39
def initialize_by_klass_and_args(job_klass, args)
  new(:job_klass => job_klass, :payload => args, :enqueued_id => GUID.generate)
end
job_klass() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 59
def job_klass
  read_attribute(:job_klass).constantize
end
job_klass=(klass) click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 63
def job_klass=(klass)
  write_attribute(:job_klass, klass.to_s)
end
optimistic_heartbeat!(last_timeout_at) click to toggle source

Bumps the ‘timeout_at` column, but raises a `JobCollision` exception if another process has changed the value, indicating we may have multiple workers processing the same job.

# File lib/resque/durable/queue_audit_methods.rb, line 94
def optimistic_heartbeat!(last_timeout_at)
  next_timeout_at = Time.now.utc + duration
  nrows = self.class.
    where(id: id, timeout_at: last_timeout_at).
    update_all(timeout_at: next_timeout_at)
  raise JobCollision.new unless nrows == 1
  next_timeout_at
end
payload() click to toggle source
Calls superclass method
# File lib/resque/durable/queue_audit_methods.rb, line 67
def payload
  ActiveSupport::JSON.decode(super)
end
payload=(value) click to toggle source
Calls superclass method
# File lib/resque/durable/queue_audit_methods.rb, line 71
def payload=(value)
  super value.to_json
end
queue() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 75
def queue
  Resque.queue_from_class(job_klass)
end
recover() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 43
def recover
  failed.each do |audit|
    begin
      audit.enqueue if audit.retryable?
    rescue => e
      message = "#{e.class.name}: #{e.message}\n#{(e.backtrace || []).join("\n")}"
      logger && logger.error("Failed to retry audit #{audit.enqueued_id}: #{message}")
    end
  end
end
reset_backoff!(timeout_at = Time.now.utc) click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 132
def reset_backoff!(timeout_at = Time.now.utc)
  # Set timeout_at = Time.now and enqueue_count = 1 so
  # the job can be picked up by the Durable Monitor asap.
  self.timeout_at = timeout_at
  self.enqueue_count = 1
  save!
end
retryable?() click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 123
def retryable?
  Time.now.utc > (timeout_at + delay)
end