class Delayed::Backend::DataMapper::Job

Public Class Methods

clear_locks!(worker_name) click to toggle source

When a worker is exiting, make sure we don't have any locked jobs.

# File lib/delayed/backend/data_mapper.rb, line 63
def self.clear_locks!(worker_name)
  all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
end
db_time_now() click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 24
def self.db_time_now
  Time.now.utc.to_datetime
end
delete_all() click to toggle source

these are common to the other backends, so we provide an implementation

# File lib/delayed/backend/data_mapper.rb, line 100
def self.delete_all
  Delayed::Job.auto_migrate!
end
expired(max_run_time = Worker.max_run_time) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 34
def self.expired(max_run_time = Worker.max_run_time)
  (
    all(:locked_at => nil) | # never locked
    all(:locked_at.lt => db_time_now - max_run_time) # lock expired
  )
end
find(id) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 104
def self.find id
  get id
end
find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 53
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  simple_conditions = {:limit => limit, :order => [:priority.asc, :run_at.asc]}
  simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority
  simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority
  simple_conditions[:queue] = Worker.queues if Worker.queues.any?

  lockable(worker_name, max_run_time).all(simple_conditions)
end
lockable(worker_name, max_run_time = Worker.max_run_time) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 28
def self.lockable(worker_name, max_run_time = Worker.max_run_time)
  never_failed &
  never_run &
  (locked_by(worker_name) | expired(max_run_time))
end
locked_by(worker_name) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 41
def self.locked_by(worker_name)
  all(:locked_by => worker_name)
end
never_failed() click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 49
def self.never_failed
  all(:failed_at => nil)
end
never_run() click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 45
def self.never_run
  (all(:run_at => nil) | all(:run_at.lte => db_time_now))
end

Public Instance Methods

==(other) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 120
def ==(other)
  id == other.id
end
lock_exclusively!(max_run_time, worker = worker_name) click to toggle source

Lock this job for this worker. Returns true if we have the lock, false otherwise.

# File lib/delayed/backend/data_mapper.rb, line 69
def lock_exclusively!(max_run_time, worker = worker_name)
  now = self.class.db_time_now

  # FIXME - this is a bit gross
  # DM doesn't give us the number of rows affected by a collection update
  # so we have to circumvent some niceness in DM::Collection here
  collection = if locked_by != worker
    self.class.expired(max_run_time).never_run.all(:id => id)
  else
    self.class.locked_by(worker).all(:id => id)
  end

  attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
  affected_rows = self.repository.update(attributes, collection)

  if affected_rows == 1
    reload # pick up the updates above
    true
  else
    # does this mean > 1 was locked, or none?
    false
  end
end
reload(*args) click to toggle source
Calls superclass method
# File lib/delayed/backend/data_mapper.rb, line 115
def reload(*args)
  reset
  super
end
reschedule_at() click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 93
def reschedule_at
  payload_object.respond_to?(:reschedule_at) ?
    payload_object.reschedule_at(self.class.db_time_now, attempts) :
    self.class.db_time_now + ((attempts ** 4) + 5).seconds
end
update_attributes(attributes) click to toggle source
# File lib/delayed/backend/data_mapper.rb, line 108
def update_attributes(attributes)
  attributes.each do |k,v|
    self[k] = v
  end
  self.save
end