class Sidekiq::QueueMetrics::UpgradeManager

Public Class Methods

acquire_lock(&block) click to toggle source
# File lib/sidekiq_queue_metrics/upgrade_manager.rb, line 63
def self.acquire_lock(&block)
  Sidekiq.redis_pool.with do |conn|
    lock_manager = Redlock::Client.new([conn], {
      retry_count:   5,
      retry_delay:   500,
      retry_jitter:  150,  # milliseconds
      redis_timeout: 0.1   # seconds
    })
    lock_manager.lock!('sidekiq_queue_metrics:upgrade_lock', 10000, &block)
  end
end
logger() click to toggle source
# File lib/sidekiq_queue_metrics/upgrade_manager.rb, line 6
def self.logger
  @@logger ||= Logger.new(STDOUT)
end
upgrade_if_needed() click to toggle source

Check if an upgrade is needed and it's not already in progress. If it's in progress, it will block during that time waiting for the upgrade to complete.

In case the lock is not released because the upgrade is taking too long it will raise an exception

@raises [Redlock::LockError]

# File lib/sidekiq_queue_metrics/upgrade_manager.rb, line 18
def self.upgrade_if_needed
  acquire_lock do
    return unless upgrade_needed?

    v2_to_v3_upgrade
  end
rescue Redlock::LockError
  fail 'A long running upgrade is in progress. Try restarting the application once finished'
end
upgrade_needed?() click to toggle source
# File lib/sidekiq_queue_metrics/upgrade_manager.rb, line 59
def self.upgrade_needed?
  Sidekiq.redis_pool.with { |conn| conn.exists(Helpers.stats_key) }
end
v2_to_v3_upgrade() click to toggle source
# File lib/sidekiq_queue_metrics/upgrade_manager.rb, line 28
def self.v2_to_v3_upgrade
  logger.info('Starting sidekiq_queue_metrics v3 upgrade')

  Sidekiq.redis_pool.with do |conn|
    old_collected_metrics = JSON.load(conn.get(Helpers.stats_key))
    old_collected_metrics.each do |(queue, stats)|
      logger.info("Upgrading #{queue} statistics")

      stats.each { |(stat, value)| Sidekiq::QueueMetrics::Storage.increment_stat(queue, stat, value) }

      failed_jobs_key = Helpers.build_failed_jobs_key(queue)

      if conn.exists(failed_jobs_key) && conn.type(failed_jobs_key) == 'string'
        temporal_failed_key = "_#{failed_jobs_key}"

        failed_jobs = JSON.parse(conn.get(Helpers.build_failed_jobs_key(queue)) || '[]')

        conn.rename(failed_jobs_key, temporal_failed_key)

        failed_jobs.each { |job| Sidekiq::QueueMetrics::Storage::add_failed_job(job) }

        conn.del(temporal_failed_key)
      end
    end

    conn.del(Helpers.stats_key)
  end

  logger.info("Sucessfully upgraded")
end