class Redis::Semaphore

Constants

API_VERSION
EXISTS_TOKEN

Public Class Methods

new(name, opts = {}) click to toggle source

stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :host => “”, :port => “”) Redis::Semaphore.new(:my_semaphore, :path => “bla”)

# File lib/redis/semaphore.rb, line 17
def initialize(name, opts = {})
  @name = name
  @expiration = opts.delete(:expiration)
  @resource_count = opts.delete(:resources) || 1
  @stale_client_timeout = opts.delete(:stale_client_timeout)
  @redis = opts.delete(:redis) || Redis.new(opts)
  @use_local_time = opts.delete(:use_local_time)
  @tokens = []
end

Public Instance Methods

all_tokens() click to toggle source
# File lib/redis/semaphore.rb, line 120
def all_tokens
  @redis.multi do
    @redis.lrange(available_key, 0, -1)
    @redis.hkeys(grabbed_key)
  end.flatten
end
available_count() click to toggle source
# File lib/redis/semaphore.rb, line 44
def available_count
  if exists?
    @redis.llen(available_key)
  else
    @resource_count
  end
end
delete!() click to toggle source
# File lib/redis/semaphore.rb, line 52
def delete!
  @redis.del(available_key)
  @redis.del(grabbed_key)
  @redis.del(exists_key)
  @redis.del(version_key)
end
exists?() click to toggle source
# File lib/redis/semaphore.rb, line 116
def exists?
  @redis.exists(exists_key)
end
exists_or_create!() click to toggle source
# File lib/redis/semaphore.rb, line 27
def exists_or_create!
  token = @redis.getset(exists_key, EXISTS_TOKEN)

  if token.nil?
    create!
  else
    # Previous versions of redis-semaphore did not set `version_key`.
    # Make sure it's set now, so we can use it in future versions.

    if token == API_VERSION && @redis.get(version_key).nil?
      @redis.set(version_key, API_VERSION)
    end

    true
  end
end
generate_unique_token() click to toggle source
# File lib/redis/semaphore.rb, line 127
def generate_unique_token
  tokens = all_tokens
  token = Random.rand.to_s

  while(tokens.include? token)
    token = Random.rand.to_s
  end
end
lock(timeout = nil) { |current_token| ... } click to toggle source
# File lib/redis/semaphore.rb, line 59
def lock(timeout = nil)
  exists_or_create!
  release_stale_locks! if check_staleness?

  if timeout.nil? || timeout > 0
    # passing timeout 0 to blpop causes it to block
    _key, current_token = @redis.blpop(available_key, timeout || 0)
  else
    current_token = @redis.lpop(available_key)
  end

  return false if current_token.nil?

  @tokens.push(current_token)
  @redis.hset(grabbed_key, current_token, current_time.to_f)
  return_value = current_token

  if block_given?
    begin
      return_value = yield current_token
    ensure
      signal(current_token)
    end
  end

  return_value
end
Also aliased as: wait
locked?(token = nil) click to toggle source
# File lib/redis/semaphore.rb, line 93
def locked?(token = nil)
  if token
    @redis.hexists(grabbed_key, token)
  else
    @tokens.each do |token|
      return true if locked?(token)
    end

    false
  end
end
release_stale_locks!() click to toggle source
# File lib/redis/semaphore.rb, line 136
def release_stale_locks!
  simple_expiring_mutex(:release_locks, 10) do
    @redis.hgetall(grabbed_key).each do |token, locked_at|
      timed_out_at = locked_at.to_f + @stale_client_timeout

      if timed_out_at < current_time.to_f
        signal(token)
      end
    end
  end
end
signal(token = 1) click to toggle source
# File lib/redis/semaphore.rb, line 105
def signal(token = 1)
  token ||= generate_unique_token

  @redis.multi do
    @redis.hdel grabbed_key, token
    @redis.lpush available_key, token

    set_expiration_if_necessary
  end
end
unlock() click to toggle source
# File lib/redis/semaphore.rb, line 88
def unlock
  return false unless locked?
  signal(@tokens.pop)[1]
end
wait(timeout = nil)
Alias for: lock

Private Instance Methods

available_key() click to toggle source
# File lib/redis/semaphore.rb, line 224
def available_key
  @available_key ||= namespaced_key('AVAILABLE')
end
check_staleness?() click to toggle source
# File lib/redis/semaphore.rb, line 208
def check_staleness?
  !@stale_client_timeout.nil?
end
create!() click to toggle source
# File lib/redis/semaphore.rb, line 184
def create!
  @redis.expire(exists_key, 10)

  @redis.multi do
    @redis.del(grabbed_key)
    @redis.del(available_key)
    @resource_count.times do |index|
      @redis.rpush(available_key, index)
    end
    @redis.set(version_key, API_VERSION)
    @redis.persist(exists_key)

    set_expiration_if_necessary
  end
end
current_time() click to toggle source
# File lib/redis/semaphore.rb, line 240
def current_time
  if @use_local_time
    Time.now
  else
    begin
      instant = redis_namespace? ? @redis.redis.time : @redis.time
      Time.at(instant[0], instant[1])
    rescue
      @use_local_time = true
      current_time
    end
  end
end
exists_key() click to toggle source
# File lib/redis/semaphore.rb, line 228
def exists_key
  @exists_key ||= namespaced_key('EXISTS')
end
grabbed_key() click to toggle source
# File lib/redis/semaphore.rb, line 232
def grabbed_key
  @grabbed_key ||= namespaced_key('GRABBED')
end
namespaced_key(variable) click to toggle source
# File lib/redis/semaphore.rb, line 216
def namespaced_key(variable)
  if redis_namespace?
    "#{@name}:#{variable}"
  else
    "SEMAPHORE:#{@name}:#{variable}"
  end
end
redis_namespace?() click to toggle source
# File lib/redis/semaphore.rb, line 212
def redis_namespace?
  (defined?(Redis::Namespace) && @redis.is_a?(Redis::Namespace))
end
set_expiration_if_necessary() click to toggle source
# File lib/redis/semaphore.rb, line 200
def set_expiration_if_necessary
  if @expiration
    [available_key, exists_key, version_key].each do |key|
      @redis.expire(key, @expiration)
    end
  end
end
simple_expiring_mutex(key_name, expires_in) { || ... } click to toggle source
# File lib/redis/semaphore.rb, line 150
def simple_expiring_mutex(key_name, expires_in)
  # Using the locking mechanism as described in
  # http://redis.io/commands/setnx

  key_name = namespaced_key(key_name)
  cached_current_time = current_time.to_f
  my_lock_expires_at = cached_current_time + expires_in + 1

  got_lock = @redis.setnx(key_name, my_lock_expires_at)

  if !got_lock
    # Check if expired
    other_lock_expires_at = @redis.get(key_name).to_f

    if other_lock_expires_at < cached_current_time
      old_expires_at = @redis.getset(key_name, my_lock_expires_at).to_f

      # Check if another client started cleanup yet. If not,
      # then we now have the lock.
      got_lock = (old_expires_at == other_lock_expires_at)
    end
  end

  return false if !got_lock

  begin
    yield
  ensure
    # Make sure not to delete the lock in case someone else already expired
    # our lock, with one second in between to account for some lag.
    @redis.del(key_name) if my_lock_expires_at > (current_time.to_f - 1)
  end
end
version_key() click to toggle source
# File lib/redis/semaphore.rb, line 236
def version_key
  @version_key ||= namespaced_key('VERSION')
end