class MultiBackgroundJob::Middleware::UniqueJob
This middleware uses an external redis queue to control duplications. The locking key is composed of worker class and its arguments. Before enqueue new jobs it will check if have a “lock” active. The TTL of lock is 1 week as default. TTL is important to ensure locks won't last forever.
Public Class Methods
bootstrap(service:)
click to toggle source
# File lib/multi_background_job/middleware/unique_job.rb, line 12 def self.bootstrap(service:) services = Dir[File.expand_path('../unique_job/*.rb', __FILE__)].map { |f| File.basename(f, '.rb').to_sym } unless services.include?(service) msg = "UniqueJob is not supported for the `%<service>p' service. Supported options are: %<services>s." raise MultiBackgroundJob::Error, format(msg, service: service.to_sym, services: services.map { |s| "`:#{s}'" }.join(', ')) end if (require("multi_background_job/middleware/unique_job/#{service}")) class_name = service.to_s.split('_').collect!{ |w| w.capitalize }.join MultiBackgroundJob::Middleware::UniqueJob.const_get(class_name).bootstrap end MultiBackgroundJob.configure do |config| config.unique_job_active = true config.middleware.add(UniqueJob) end end
Public Instance Methods
call(worker, service) { || ... }
click to toggle source
# File lib/multi_background_job/middleware/unique_job.rb, line 29 def call(worker, service) if MultiBackgroundJob.config.unique_job_active? && (uniq_lock = unique_job_lock(worker: worker, service: service)) return false if uniq_lock.locked? # Don't push job to server # Add unique job information to the job payload worker.unique_job.lock = uniq_lock worker.payload['uniq'] = worker.unique_job.to_hash uniq_lock.lock end yield end
Protected Instance Methods
unique_job_lock(worker:, service:)
click to toggle source
# File lib/multi_background_job/middleware/unique_job.rb, line 46 def unique_job_lock(worker:, service:) return unless worker.unique_job? digest = LockDigest.new( *[service || worker.options[:service], worker.options[:queue]].compact, across: worker.unique_job.across, ) Lock.new( digest: digest.to_s, lock_id: unique_job_lock_id(worker), ttl: worker.unique_job.ttl, ) end
unique_job_lock_id(worker)
click to toggle source
# File lib/multi_background_job/middleware/unique_job.rb, line 61 def unique_job_lock_id(worker) identifier_data = [worker.worker_class, worker.payload.fetch('args'.freeze, [])] Digest::SHA256.hexdigest( MultiJson.dump(identifier_data, mode: :compat), ) end