module ActiveJobLock::Core
If you want only one instance of your job running at a time, include this module:
class UpdateNetworkGraph < ActiveJob::Base include ActiveJobLock::Core queue_as :network_graph def perform(repo_id) heavy_lifting end end
If you wish to limit the duration a lock may be held for, you can set/override ‘lock_timeout`. e.g.
class UpdateNetworkGraph < ActiveJob::Base include ActiveJobLock::Core queue_as :network_graph # lock may be held for upto an hour. lock timeout: 3600 def perform(repo_id) heavy_lifting end end
If you wish that only one instance of the job defined by identifier
may be enqueued or running, you can set/override ‘loner`. e.g.
class PdfExport < ActiveJob::Base include ActiveJobLock::Core queue_as :exports # only one job can be running/enqueued at a time. For instance a button # to run a PDF export. If the user clicks several times on it, enqueue # the job if and only if # - the same export is not currently running # - the same export is not currently queued. # ('same' being defined by `identifier`) lock loner: true def perform(repo_id) heavy_lifting end end
Public Class Methods
# File lib/active_job_lock/core.rb, line 49 def self.included(base) base.extend(ClassMethods) end
# File lib/active_job_lock/core.rb, line 62 def initialize(*args) super(*args) self.extend(OverriddenMethods) end
Private Instance Methods
Try to acquire a lock for running the job. @return [Boolean, Fixnum]
# File lib/active_job_lock/core.rb, line 226 def acquire_lock!(*args) acquire_lock_impl!(:redis_lock_key, :lock_failed, *args) end
Attempts to acquire the lock using a timeout / deadlock algorithm.
Locking algorithm: code.google.com/p/redis/wiki/SetnxCommand
@param [String] lock_key redis lock key @param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 269 def acquire_lock_algorithm!(lock_key, *args) now = Time.now.to_i lock_until = now + lock_timeout acquired = false return [true, lock_until] if lock_redis.setnx(lock_key, lock_until) # Can't acquire the lock, see if it has expired. lock_expiration = lock_redis.get(lock_key) if lock_expiration && lock_expiration.to_i < now # expired, try to acquire. lock_expiration = lock_redis.getset(lock_key, lock_until) if lock_expiration.nil? || lock_expiration.to_i < now acquired = true end else # Try once more... acquired = true if lock_redis.setnx(lock_key, lock_until) end [acquired, lock_until] end
Generic implementation of the locking logic
Returns false; when unable to acquire the lock.
-
Returns true; when lock acquired, without a timeout.
-
Returns timestamp; when lock acquired with a timeout, timestamp is when the lock timeout expires.
@param [Symbol] lock_key_method the method returning redis key to lock @param [Symbol] failed_hook the method called if lock failed @param [Array] args job arguments @return [Boolean, Fixnum]
# File lib/active_job_lock/core.rb, line 247 def acquire_lock_impl!(lock_key_method, failed_hook, *args) acquired = false lock_key = self.send(lock_key_method, *args) unless lock_timeout > 0 # Acquire without using a timeout. acquired = true if lock_redis.setnx(lock_key, true) else # Acquire using the timeout algorithm. acquired, lock_until = acquire_lock_algorithm!(lock_key, *args) end self.send(failed_hook, *args) if !acquired lock_until && acquired ? lock_until : acquired end
Try to acquire a lock to enqueue a loner job. @return [Boolean, Fixnum]
# File lib/active_job_lock/core.rb, line 232 def acquire_loner_lock!(*args) acquire_lock_impl!(:redis_loner_lock_key, :loner_enqueue_failed, *args) end
Convenience method to check if a loner job is queued and lock did not expire.
@return [Boolean] true if the job is already queued
# File lib/active_job_lock/core.rb, line 188 def enqueued?(*args) inspect_lock(:redis_loner_lock_key, *args) end
@abstract You may override to implement a custom identifier,
you should consider doing this if your job arguments are many/long or may not cleanly cleanly to strings.
Builds an identifier using the job arguments. This identifier is used as part of the redis lock key.
@param [Array] args job arguments @return [String, nil] job identifier
# File lib/active_job_lock/core.rb, line 123 def identifier(*args) args.join('-') end
Check for existence of given key.
@param [Array] args job arguments @param [Symbol] lock_key_method the method returning redis key to lock @return [Boolean] true if the lock exists
# File lib/active_job_lock/core.rb, line 197 def inspect_lock(lock_key_method, *args) lock_until = lock_redis.get(self.send(lock_key_method, *args)) return (lock_until.to_i > Time.now.to_i) if lock_timeout > 0 !lock_until.nil? end
@abstract Hook method; called when the lock expired before we released it.
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 221 def lock_expired_before_release(*args) end
@abstract Hook method; called when unable to acquire the lock.
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 207 def lock_failed(*args) end
Override to fully control the redis object used for storing the locks.
@return [Redis] redis object
# File lib/active_job_lock/core.rb, line 131 def lock_redis @lock_redis ||= ActiveJobLock::Config.redis end
Number of seconds the lock may be held for. A value of 0 or below will lock without a timeout.
@return [Fixnum]
# File lib/active_job_lock/core.rb, line 160 def lock_timeout @lock_timeout ||= self.class.lock_timeout || 0 end
Convenience method to check if job is locked and lock did not expire.
@return [Boolean] true if the job is locked by someone
# File lib/active_job_lock/core.rb, line 181 def locked?(*args) inspect_lock(:redis_lock_key, *args) end
Whether one instance of the job should be running or enqueued.
@return [TrueClass || FalseClass]
# File lib/active_job_lock/core.rb, line 167 def loner @loner ||= self.class.loner || false end
@abstract Hook method; called when unable to enqueue loner job.
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 214 def loner_enqueue_failed(*args) end
Checks if job is locked or loner locked (if applicable).
@return [Boolean] true if the job is locked by someone
# File lib/active_job_lock/core.rb, line 174 def loner_locked?(*args) locked?(*args) || (loner && enqueued?(*args)) end
Override to fully control the lock key used. It is passed the job arguments.
The default looks like this: ‘lock:<class name>:<identifier>`
@param [Array] args job arguments @return [String] redis key
# File lib/active_job_lock/core.rb, line 142 def redis_lock_key(*args) ['lock', self.class.name, identifier(*args)].compact.join(':') end
Builds lock key used by ‘@loner` option. Passed job arguments.
The default looks like this: ‘loner:lock:<class name>:<identifier>`
@param [Array] args job arguments @return [String] redis key
# File lib/active_job_lock/core.rb, line 152 def redis_loner_lock_key(*args) ['loner', redis_lock_key(*args)].compact.join(':') end
Refresh the lock.
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 308 def refresh_lock!(*args) now = Time.now.to_i lock_until = now + lock_timeout lock_redis.set(redis_lock_key(*args), lock_until) end
Release the lock.
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 294 def release_lock!(*args) lock_redis.del(redis_lock_key(*args)) end
Release the enqueue lock for loner jobs
@param [Array] args job arguments
# File lib/active_job_lock/core.rb, line 301 def release_loner_lock!(*args) lock_redis.del(redis_loner_lock_key(*args)) end