class Sidekiq::BaseReliableFetch
Constants
- DEFAULT_CLEANUP_INTERVAL
- DEFAULT_LEASE_INTERVAL
Defines how often we try to take a lease to not flood our Redis server with SET requests
- DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION
How much time a job can be interrupted
- HEARTBEAT_INTERVAL
- HEARTBEAT_LIFESPAN
- HEARTBEAT_RETRY_DELAY
- LEASE_KEY
- LEGACY_WORKING_QUEUE_REGEX
- SCAN_COUNT
Defines the COUNT parameter that will be passed to Redis SCAN command
- UnitOfWork
- WORKING_QUEUE_PREFIX
- WORKING_QUEUE_REGEX
Regexes for matching working queue keys
Attributes
cleanup_interval[R]
last_try_to_take_lease_at[R]
lease_interval[R]
queues[R]
strictly_ordered_queues[R]
use_semi_reliable_fetch[R]
Public Class Methods
heartbeat()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 89 def self.heartbeat Sidekiq.redis do |conn| conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN) end Sidekiq.logger.debug("Heartbeat for #{identity}") end
heartbeat_key(identity)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 101 def self.heartbeat_key(identity) "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}" end
hostname()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 77 def self.hostname Socket.gethostname end
identity()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 85 def self.identity @@identity ||= "#{hostname}:#{$$}:#{process_nonce}" end
new(options)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 113 def initialize(options) raise ArgumentError, 'missing queue list' unless options[:queues] @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) @last_try_to_take_lease_at = 0 @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } end
process_nonce()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 81 def self.process_nonce @@process_nonce ||= SecureRandom.hex(6) end
setup_reliable_fetch!(config)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 47 def self.setup_reliable_fetch!(config) fetch_strategy = if config.options[:semi_reliable_fetch] Sidekiq::SemiReliableFetch else Sidekiq::ReliableFetch end config.options[:fetch] = fetch_strategy.new(config.options) Sidekiq.logger.info('GitLab reliable fetch activated!') start_heartbeat_thread end
start_heartbeat_thread()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 61 def self.start_heartbeat_thread Thread.new do loop do begin heartbeat sleep HEARTBEAT_INTERVAL rescue => e Sidekiq.logger.error("Heartbeat thread error: #{e.message}") sleep HEARTBEAT_RETRY_DELAY end end end end
worker_dead?(identity, conn)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 97 def self.worker_dead?(identity, conn) !conn.get(heartbeat_key(identity)) end
working_queue_name(queue)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 105 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}" end
Public Instance Methods
bulk_requeue(inprogress, _options)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 134 def bulk_requeue(inprogress, _options) return if inprogress.empty? Sidekiq.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi) multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end rescue => e Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}") end
retrieve_unit_of_work()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 129 def retrieve_unit_of_work raise NotImplementedError, "#{self.class} does not implement #{__method__}" end
retrieve_work()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 123 def retrieve_work clean_working_queues! if take_lease retrieve_unit_of_work end
Private Instance Methods
allowed_to_take_a_lease?()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 261 def allowed_to_take_a_lease? Time.now.to_f - last_try_to_take_lease_at > lease_interval end
clean_working_queue!(original_queue, working_queue)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 206 def clean_working_queue!(original_queue, working_queue) Sidekiq.redis do |conn| while job = conn.rpop(working_queue) preprocess_interrupted_job(job, original_queue) end end end
clean_working_queues!()
click to toggle source
Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.
# File lib/sidekiq/base_reliable_fetch.rb, line 192 def clean_working_queues! Sidekiq.logger.info('Cleaning working queues') Sidekiq.redis do |conn| conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key| original_queue, identity = extract_queue_and_identity(key) next if original_queue.nil? || identity.nil? clean_working_queue!(original_queue, key) if self.class.worker_dead?(identity, conn) end end end
extract_queue_and_identity(key)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 177 def extract_queue_and_identity(key) # New identity format is "{hostname}:{pid}:{randomhex} # Old identity format is "{hostname}:{pid}" # Queue names may also have colons (namespaced). # Expressing this in a single regex is unreadable # Test the newer expected format first, only checking the older if necessary original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten return original_queue, identity unless original_queue.nil? || identity.nil? key.scan(LEGACY_WORKING_QUEUE_REGEX).flatten end
interruption_exhausted?(msg)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 214 def interruption_exhausted?(msg) return false if max_retries_after_interruption(msg['class']) < 0 msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class']) end
max_retries_after_interruption(worker_class)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 220 def max_retries_after_interruption(worker_class) max_retries_after_interruption = nil max_retries_after_interruption ||= begin Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption] rescue NameError end max_retries_after_interruption ||= Sidekiq.options[:max_retries_after_interruption] max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION max_retries_after_interruption end
preprocess_interrupted_job(job, queue, conn = nil)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 152 def preprocess_interrupted_job(job, queue, conn = nil) msg = Sidekiq.load_json(job) msg['interrupted_count'] = msg['interrupted_count'].to_i + 1 if interruption_exhausted?(msg) send_to_quarantine(msg, conn) else requeue_job(queue, msg, conn) end end
requeue_job(queue, msg, conn)
click to toggle source
If you want this method to be run in a scope of multi connection you need to pass it
# File lib/sidekiq/base_reliable_fetch.rb, line 165 def requeue_job(queue, msg, conn) with_connection(conn) do |conn| conn.lpush(queue, Sidekiq.dump_json(msg)) end Sidekiq.logger.info( message: "Pushed job #{msg['jid']} back to queue #{queue}", jid: msg['jid'], queue: queue ) end
send_to_quarantine(msg, multi_connection = nil)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 233 def send_to_quarantine(msg, multi_connection = nil) Sidekiq.logger.warn( class: msg['class'], jid: msg['jid'], message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue) ) job = Sidekiq.dump_json(msg) Sidekiq::InterruptedSet.new.put(job, connection: multi_connection) end
take_lease()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 251 def take_lease return unless allowed_to_take_a_lease? @last_try_to_take_lease_at = Time.now.to_f Sidekiq.redis do |conn| conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval) end end
with_connection(conn) { |conn| ... }
click to toggle source
Yield block with an existing connection or creates another one
# File lib/sidekiq/base_reliable_fetch.rb, line 245 def with_connection(conn) return yield(conn) if conn Sidekiq.redis { |redis_conn| yield(redis_conn) } end