class Sidekiq::Clutch
Constants
- TEMPORARY_KEY_EXPIRATION_DURATION
22 days - how long a
Sidekiq
job can live with exponential backoff- VERSION
Attributes
batch[R]
current_result_key[RW]
on_failure[RW]
parallel_key[R]
queue[R]
Public Class Methods
new(batch = nil)
click to toggle source
# File lib/sidekiq/clutch.rb, line 12 def initialize(batch = nil) @batch = batch || Sidekiq::Batch.new end
Public Instance Methods
clear()
click to toggle source
# File lib/sidekiq/clutch.rb, line 34 def clear @jobs = nil end
engage()
click to toggle source
# File lib/sidekiq/clutch.rb, line 42 def engage return if jobs.empty? if batch.mutable? setup_batch else batch.jobs do @batch = Sidekiq::Batch.new setup_batch end end end
jobs()
click to toggle source
# File lib/sidekiq/clutch.rb, line 30 def jobs @jobs ||= JobsCollection.new(self) end
on_complete(status, options)
click to toggle source
# File lib/sidekiq/clutch.rb, line 95 def on_complete(status, options) return if status.failures.zero? return if options['on_failure'].nil? Object.const_get(options['on_failure']).new.perform(status) end
on_success(status, options)
click to toggle source
# File lib/sidekiq/clutch.rb, line 74 def on_success(status, options) return on_success_legacy(status, options) if options['jobs'] raise 'invariant: key_base is missing!' unless options['key_base'] raise 'invariant: result_key_index is missing!' unless options['result_key_index'] # NOTE: This is a brand new instance of Sidekiq::Clutch that Sidekiq instantiates, # so we need to set @key_base again. @key_base = options['key_base'] remaining_jobs = JSON.parse(Sidekiq.redis { |r| r.get(jobs_key) }) if remaining_jobs.empty? clean_up_temporary_keys return end parent_batch = Sidekiq::Batch.new(status.parent_bid) service = self.class.new(parent_batch) service.jobs.raw = remaining_jobs service.current_result_key = "#{key_base}-#{options['result_key_index']}" service.engage end
parallel() { || ... }
click to toggle source
# File lib/sidekiq/clutch.rb, line 20 def parallel @parallel_key = SecureRandom.uuid yield @parallel_key = nil end
parallel?()
click to toggle source
# File lib/sidekiq/clutch.rb, line 26 def parallel? !!@parallel_key end
queue=(q)
click to toggle source
# File lib/sidekiq/clutch.rb, line 38 def queue=(q) @queue = q && q.to_s end
setup_batch()
click to toggle source
# File lib/sidekiq/clutch.rb, line 54 def setup_batch jobs_queue = jobs.raw.dup step = jobs_queue.shift set_jobs_data_in_redis(jobs_queue) return if step.nil? batch.callback_queue = queue if queue batch.on(:success, Sidekiq::Clutch, 'key_base' => key_base, 'result_key_index' => result_key_index(step)) on_failure_name = on_failure&.name batch.on(:complete, Sidekiq::Clutch, 'on_failure' => on_failure_name) if on_failure_name batch.jobs do if step['series'] series_step(step) elsif step['parallel'] parallel_step(step) else raise "unknown step: #{step.inspect}" end end end
Private Instance Methods
clean_up_temporary_keys()
click to toggle source
# File lib/sidekiq/clutch.rb, line 169 def clean_up_temporary_keys Sidekiq.redis do |redis| redis.del(jobs_key) result_key_index = 1 loop do result = redis.del("#{key_base}-#{result_key_index}") result_key_index += 1 break if result == 0 end end end
enqueue_job(klass, params, result_key_index)
click to toggle source
# File lib/sidekiq/clutch.rb, line 155 def enqueue_job(klass, params, result_key_index) job_options = Object.const_get(klass).sidekiq_options result_key = "#{key_base}-#{result_key_index}" options = { 'class' => JobWrapper, 'queue' => queue || job_options['queue'], 'wrapped' => klass, 'args' => [batch.bid, klass, params, current_result_key, result_key], 'retry' => job_options['retry'], 'backtrace' => job_options['backtrace'] } Sidekiq::Client.push(options) end
jobs_key()
click to toggle source
# File lib/sidekiq/clutch.rb, line 121 def jobs_key "#{key_base}-jobs" end
key_base()
click to toggle source
# File lib/sidekiq/clutch.rb, line 117 def key_base @key_base ||= SecureRandom.uuid end
on_success_legacy(status, options)
click to toggle source
accept old style of passing job data, will be removed in 3.0
# File lib/sidekiq/clutch.rb, line 104 def on_success_legacy(status, options) @key_base = options['result_key'].sub(/-\d+$/, '') if options['jobs'].empty? clean_up_temporary_keys return end parent_batch = Sidekiq::Batch.new(status.parent_bid) service = self.class.new(parent_batch) service.jobs.raw = options['jobs'] service.current_result_key = options['result_key'] service.engage end
parallel_step(step)
click to toggle source
# File lib/sidekiq/clutch.rb, line 139 def parallel_step(step) step['parallel'].each do |(klass, params)| enqueue_job(klass, params, result_key_index(step)) end end
result_key_index(step)
click to toggle source
# File lib/sidekiq/clutch.rb, line 145 def result_key_index(step) if step['result_key_index'] step['result_key_index'] elsif step['result_key'] # legacy style, will be removed in 3.0 step['result_key'].split('-').last.to_i else raise "invariant: expected result_key_index passed in step; got: #{step.inspect}" end end
series_step(step)
click to toggle source
# File lib/sidekiq/clutch.rb, line 134 def series_step(step) (klass, params) = step['series'] enqueue_job(klass, params, result_key_index(step)) end
set_jobs_data_in_redis(data)
click to toggle source
# File lib/sidekiq/clutch.rb, line 125 def set_jobs_data_in_redis(data) Sidekiq.redis do |redis| redis.multi do |multi| multi.set(jobs_key, data.to_json) multi.expire(jobs_key, TEMPORARY_KEY_EXPIRATION_DURATION) end end end