class SidekiqUniqueJobs::UpgradeLocks
Upgrades locks between gem version upgrades
@author Mikael Henriksson <mikael@mhenrixon.com>
Constants
- BATCH_SIZE
@return [Integer] the number of keys to batch upgrade
- OLD_SUFFIXES
@return [Array<String>] suffixes for old version
Attributes
conn[R]
Public Class Methods
call()
click to toggle source
Performs upgrade of old locks
@return [Integer] the number of upgrades locks
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 31 def self.call redis do |conn| new(conn).call end end
new(conn)
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 39 def initialize(conn) @count = 0 @conn = conn redis_version # Avoid pipelined calling redis_version and getting a future. end
Public Instance Methods
call()
click to toggle source
Performs upgrade of old locks
@return [Integer] the number of upgrades locks
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 51 def call with_logging_context do return log_info("Already upgraded to #{version}") if conn.hget(upgraded_key, version) # TODO: Needs handling of v7.0.0 => v7.0.1 where we don't want to return log_info("Skipping upgrade because #{DEAD_VERSION} has been set") if conn.get(DEAD_VERSION) log_info("Start - Upgrading Locks") upgrade_v6_locks delete_unused_v6_keys delete_supporting_v6_keys conn.hset(upgraded_key, version, now_f) log_info("Done - Upgrading Locks") end @count end
Private Instance Methods
batch_delete(*keys)
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 114 def batch_delete(*keys) return if keys.empty? conn.pipelined do if VersionCheck.satisfied?(redis_version, ">= 4.0.0") conn.unlink(*keys) else conn.del(*keys) end end end
batch_scan(match:, count:) { |values| ... }
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 126 def batch_scan(match:, count:) cursor = "0" loop do cursor, values = conn.scan(cursor, match: match, count: count) yield values break if cursor == "0" end end
delete_suffix(suffix)
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 108 def delete_suffix(suffix) batch_scan(match: "*:#{suffix}", count: BATCH_SIZE) do |keys| batch_delete(*keys) end end
delete_supporting_v6_keys()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 104 def delete_supporting_v6_keys batch_delete("unique:keys") end
delete_unused_v6_keys()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 96 def delete_unused_v6_keys log_info("Start - Deleting v6 keys") OLD_SUFFIXES.each do |suffix| delete_suffix(suffix) end log_info("Done - Deleting v6 keys") end
logging_context()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 147 def logging_context if logger_context_hash? { "uniquejobs" => :upgrade_locks } else "uniquejobs-upgrade_locks" end end
now_f()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 139 def now_f SidekiqUniqueJobs.now_f end
redis_version()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 143 def redis_version @redis_version ||= SidekiqUniqueJobs.config.redis_version end
upgrade_v6_lock(grabbed_key)
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 85 def upgrade_v6_lock(grabbed_key) locked_key = grabbed_key.gsub(":GRABBED", ":LOCKED") digest = grabbed_key.gsub(":GRABBED", "") locks = conn.hgetall(grabbed_key) conn.pipelined do conn.hmset(locked_key, *locks.to_a) conn.zadd(DIGESTS, locks.values.first, digest) end end
upgrade_v6_locks()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 76 def upgrade_v6_locks log_info("Start - Converting v6 locks to v7") conn.scan_each(match: "*:GRABBED", count: BATCH_SIZE) do |grabbed_key| upgrade_v6_lock(grabbed_key) @count += 1 end log_info("Done - Converting v6 locks to v7") end
upgraded_key()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 72 def upgraded_key @upgraded_key ||= "#{LIVE_VERSION}:UPGRADED" end
version()
click to toggle source
# File lib/sidekiq_unique_jobs/upgrade_locks.rb, line 135 def version SidekiqUniqueJobs.version end