class SidekiqUniqueJobs::Locksmith

Lock manager class that handles all the various locks

@author Mikael Henriksson <mikael@mhenrixon.com>

Constants

CLOCK_DRIFT_FACTOR

@return [Float] used to take into consideration the inaccuracy of redis timestamps

Attributes

config[R]

@!attribute [r] config

@return [LockConfig] the configuration for this lock
item[R]

@!attribute [r] item

@return [Hash] a sidekiq job hash
job_id[R]

@!attribute [r] job_id

@return [String] a sidekiq JID
key[R]

@!attribute [r] key

@return [Key] the key used for locking
redis_pool[R]

Public Class Methods

new(item, redis_pool = nil) click to toggle source

Initialize a new Locksmith instance

@param [Hash] item a Sidekiq job hash @option item [Integer] :lock_ttl the configured expiration @option item [String] :jid the sidekiq job id @option item [String] :unique_digest the unique digest (See: {LockDigest#lock_digest}) @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection

# File lib/sidekiq_unique_jobs/locksmith.rb, line 62
def initialize(item, redis_pool = nil)
  @item        = item
  @key         = Key.new(item[LOCK_DIGEST] || item[UNIQUE_DIGEST]) # fallback until can be removed
  @job_id      = item[JID]
  @config      = LockConfig.new(item)
  @redis_pool  = redis_pool
end

Public Instance Methods

==(other) click to toggle source

Compare this locksmith with another

@param [Locksmith] other the locksmith to compare with

@return [true, false]

# File lib/sidekiq_unique_jobs/locksmith.rb, line 171
def ==(other)
  key == other.key && job_id == other.job_id
end
delete() click to toggle source

Deletes the lock unless it has a pttl set

# File lib/sidekiq_unique_jobs/locksmith.rb, line 74
def delete
  return if config.pttl.positive?

  delete!
end
delete!() click to toggle source

Deletes the lock regardless of if it has a pttl set

# File lib/sidekiq_unique_jobs/locksmith.rb, line 83
def delete!
  call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).to_i.positive?
end
execute(&block) click to toggle source
# File lib/sidekiq_unique_jobs/locksmith.rb, line 101
def execute(&block)
  raise SidekiqUniqueJobs::InvalidArgument, "#execute needs a block" unless block

  redis(redis_pool) do |conn|
    lock!(conn, method(:primed_async), &block)
  end
end
inspect() click to toggle source

@see to_s

# File lib/sidekiq_unique_jobs/locksmith.rb, line 160
def inspect
  to_s
end
lock(wait: nil) click to toggle source

Create a lock for the Sidekiq job

@return [String] the Sidekiq job_id that was locked/queued

# File lib/sidekiq_unique_jobs/locksmith.rb, line 92
def lock(wait: nil)
  method_name = wait ? :primed_async : :primed_sync
  redis(redis_pool) do |conn|
    lock!(conn, method(method_name), wait) do
      return job_id
    end
  end
end
locked?(conn = nil) click to toggle source

Checks if this instance is considered locked

@param [Sidekiq::RedisConnection, ConnectionPool] conn the redis connection

@return [true, false] true when the :LOCKED hash contains the job_id

# File lib/sidekiq_unique_jobs/locksmith.rb, line 141
def locked?(conn = nil)
  return taken?(conn) if conn

  redis { |rcon| taken?(rcon) }
end
to_s() click to toggle source

Nicely formatted string with information about self

@return [String]

# File lib/sidekiq_unique_jobs/locksmith.rb, line 153
def to_s
  "Locksmith##{object_id}(digest=#{key} job_id=#{job_id} locked=#{locked?})"
end
unlock(conn = nil) click to toggle source

Removes the lock keys from Redis if locked by the provided jid/token

@return [false] unless locked? @return [String] Sidekiq job_id (jid) if successful

# File lib/sidekiq_unique_jobs/locksmith.rb, line 115
def unlock(conn = nil)
  return false unless locked?(conn)

  unlock!(conn)
end
unlock!(conn = nil) click to toggle source

Removes the lock keys from Redis

@return [false] unless locked? @return [String] Sidekiq job_id (jid) if successful

# File lib/sidekiq_unique_jobs/locksmith.rb, line 127
def unlock!(conn = nil)
  call_script(:unlock, key.to_a, argv, conn) do |unlocked_jid|
    reflect(:debug, :unlocked, item, unlocked_jid) if unlocked_jid == job_id

    unlocked_jid
  end
end

Private Instance Methods

add_drift(val) click to toggle source
# File lib/sidekiq_unique_jobs/locksmith.rb, line 330
def add_drift(val)
  val + drift(val)
end
argv() click to toggle source
# File lib/sidekiq_unique_jobs/locksmith.rb, line 345
def argv
  [job_id, config.pttl, config.type, config.limit]
end
brpoplpush(conn, wait) click to toggle source

@api private

# File lib/sidekiq_unique_jobs/locksmith.rb, line 289
def brpoplpush(conn, wait)
  raise InvalidArgument, "wait must be an integer" unless wait.is_a?(Integer)

  # passing timeout 0 to brpoplpush causes it to block indefinitely
  conn.brpoplpush(key.queued, key.primed, timeout: wait)
end
drift(val) click to toggle source

Used to combat redis imprecision with ttl/pttl

@param [Integer] val the value to compute drift for

@return [Integer] a computed drift value

# File lib/sidekiq_unique_jobs/locksmith.rb, line 323
def drift(val)
  # Add 2 milliseconds to the drift to account for Redis expires
  # precision, which is 1 millisecond, plus 1 millisecond min drift
  # for small TTLs.
  (val + 2).to_f * CLOCK_DRIFT_FACTOR
end
enqueue(conn) { |job_id| ... } click to toggle source

Prepares all the various lock data

@param [Redis] conn a redis connection

@return [nil] when redis was already prepared for this lock @return [yield<String>] when successfully enqueued

# File lib/sidekiq_unique_jobs/locksmith.rb, line 216
def enqueue(conn)
  queued_jid, elapsed = timed do
    call_script(:queue, key.to_a, argv, conn)
  end

  return unless queued_jid
  return unless [job_id, "1"].include?(queued_jid)

  validity = config.pttl - elapsed - drift(config.pttl)
  return unless validity >= 0 || config.pttl.zero?

  write_lock_info(conn)
  yield job_id
end
handle_primed(primed_jid) { |job_id| ... } click to toggle source
# File lib/sidekiq_unique_jobs/locksmith.rb, line 263
def handle_primed(primed_jid)
  return yield job_id if [job_id, "1"].include?(primed_jid)

  reflect(:timeout, item) unless config.wait_for_lock?
end
lock!(conn, primed_method, wait = nil) { |job_id| ... } click to toggle source

Used to reduce some duplication from the two methods

@see lock @see execute

@param [Sidekiq::RedisConnection, ConnectionPool] conn the redis connection @param [Method] primed_method reference to the method to use for getting a primed token

@yieldparam [string] job_id the sidekiq JID @yieldreturn [void] whatever the calling block returns

# File lib/sidekiq_unique_jobs/locksmith.rb, line 190
def lock!(conn, primed_method, wait = nil)
  return yield job_id if locked?(conn)

  enqueue(conn) do |queued_jid|
    reflect(:debug, item, queued_jid)

    primed_method.call(conn, wait) do |primed_jid|
      reflect(:debug, :primed, item, primed_jid)

      locked_jid = call_script(:lock, key.to_a, argv, conn)
      if locked_jid
        reflect(:debug, :locked, item, locked_jid)
        return yield job_id
      end
    end
  end
end
lock_info() click to toggle source
# File lib/sidekiq_unique_jobs/locksmith.rb, line 349
def lock_info
  @lock_info ||= dump_json(
    WORKER => item[CLASS],
    QUEUE => item[QUEUE],
    LIMIT => item[LOCK_LIMIT],
    TIMEOUT => item[LOCK_TIMEOUT],
    TTL => item[LOCK_TTL],
    TYPE => config.type,
    LOCK_ARGS => item[LOCK_ARGS],
    TIME => now_f,
  )
end
pop_queued(conn, wait = nil) click to toggle source

Does the actual popping of the enqueued token

@param [Redis] conn a redis connection

@return [String] a previously enqueued token (now taken off the queue)

# File lib/sidekiq_unique_jobs/locksmith.rb, line 276
def pop_queued(conn, wait = nil)
  wait ||= config.timeout if config.wait_for_lock?

  if wait.nil?
    rpoplpush(conn)
  else
    brpoplpush(conn, wait)
  end
end
primed_async(conn, wait = nil, &block) click to toggle source

Pops an enqueued token @note Used for runtime locks to avoid problems with blocking commands

in current thread

@param [Redis] conn a redis connection

@return [nil] when lock was not possible @return [Object] whatever the block returns when lock was acquired

# File lib/sidekiq_unique_jobs/locksmith.rb, line 241
def primed_async(conn, wait = nil, &block)
  primed_jid = Concurrent::Promises
               .future(conn) { |red_con| pop_queued(red_con, wait) }
               .value(add_drift(wait || config.timeout))

  handle_primed(primed_jid, &block)
end
primed_sync(conn, wait = nil, &block) click to toggle source

Pops an enqueued token @note Used for non-runtime locks

@param [Redis] conn a redis connection

@return [nil] when lock was not possible @return [Object] whatever the block returns when lock was acquired

# File lib/sidekiq_unique_jobs/locksmith.rb, line 258
def primed_sync(conn, wait = nil, &block)
  primed_jid = pop_queued(conn, wait)
  handle_primed(primed_jid, &block)
end
rpoplpush(conn) click to toggle source

@api private

# File lib/sidekiq_unique_jobs/locksmith.rb, line 299
def rpoplpush(conn)
  conn.rpoplpush(key.queued, key.primed)
end
taken?(conn) click to toggle source

Checks if the lock has been taken

@param [Redis] conn a redis connection

@return [true, false]

# File lib/sidekiq_unique_jobs/locksmith.rb, line 341
def taken?(conn)
  conn.hexists(key.locked, job_id)
end
write_lock_info(conn) click to toggle source

Writes lock information to redis.

The lock information contains information about worker, queue, limit etc.

@return [void]

# File lib/sidekiq_unique_jobs/locksmith.rb, line 310
def write_lock_info(conn)
  return unless config.lock_info?

  conn.set(key.info, lock_info)
end