module Resque::Durable::QueueAuditMethods
Constants
- JobCollision
Public Instance Methods
cleanup(date)
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 54 def cleanup(date) older_than(date).delete_all end
complete!()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 114 def complete! self.completed_at = Time.now.utc save! end
complete?()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 119 def complete? completed_at.present? end
delay()
click to toggle source
1, 8, 27, 64, 125, 216, etc. minutes.
# File lib/resque/durable/queue_audit_methods.rb, line 128 def delay (enqueue_count ** 3).minutes end
duration()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 83 def duration job_klass.job_timeout end
enqueue()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 79 def enqueue job_klass.enqueue(*(payload.push(becomes(job_klass.auditor)))) end
enqueued!()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 107 def enqueued! self.enqueued_at = Time.now.utc self.timeout_at = enqueued_at + duration self.enqueue_count += 1 save! end
fail!()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 103 def fail! update_attribute(:timeout_at, Time.now.utc) end
heartbeat!()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 87 def heartbeat! update_attribute(:timeout_at, Time.now.utc + duration) end
initialize_by_klass_and_args(job_klass, args)
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 39 def initialize_by_klass_and_args(job_klass, args) new(:job_klass => job_klass, :payload => args, :enqueued_id => GUID.generate) end
job_klass()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 59 def job_klass read_attribute(:job_klass).constantize end
job_klass=(klass)
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 63 def job_klass=(klass) write_attribute(:job_klass, klass.to_s) end
optimistic_heartbeat!(last_timeout_at)
click to toggle source
Bumps the ‘timeout_at` column, but raises a `JobCollision` exception if another process has changed the value, indicating we may have multiple workers processing the same job.
# File lib/resque/durable/queue_audit_methods.rb, line 94 def optimistic_heartbeat!(last_timeout_at) next_timeout_at = Time.now.utc + duration nrows = self.class. where(id: id, timeout_at: last_timeout_at). update_all(timeout_at: next_timeout_at) raise JobCollision.new unless nrows == 1 next_timeout_at end
payload()
click to toggle source
Calls superclass method
# File lib/resque/durable/queue_audit_methods.rb, line 67 def payload ActiveSupport::JSON.decode(super) end
payload=(value)
click to toggle source
Calls superclass method
# File lib/resque/durable/queue_audit_methods.rb, line 71 def payload=(value) super value.to_json end
queue()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 75 def queue Resque.queue_from_class(job_klass) end
recover()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 43 def recover failed.each do |audit| begin audit.enqueue if audit.retryable? rescue => e message = "#{e.class.name}: #{e.message}\n#{(e.backtrace || []).join("\n")}" logger && logger.error("Failed to retry audit #{audit.enqueued_id}: #{message}") end end end
reset_backoff!(timeout_at = Time.now.utc)
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 132 def reset_backoff!(timeout_at = Time.now.utc) # Set timeout_at = Time.now and enqueue_count = 1 so # the job can be picked up by the Durable Monitor asap. self.timeout_at = timeout_at self.enqueue_count = 1 save! end
retryable?()
click to toggle source
# File lib/resque/durable/queue_audit_methods.rb, line 123 def retryable? Time.now.utc > (timeout_at + delay) end