class Redis::Semaphore
Constants
- API_VERSION
- EXISTS_TOKEN
Public Class Methods
new(name, opts = {})
click to toggle source
stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new
(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new
(:my_semaphore, :redis => myRedis) Redis::Semaphore.new
(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new
(:my_semaphore, :host => “”, :port => “”) Redis::Semaphore.new
(:my_semaphore, :path => “bla”)
# File lib/redis/semaphore.rb, line 17 def initialize(name, opts = {}) @name = name @expiration = opts.delete(:expiration) @resource_count = opts.delete(:resources) || 1 @stale_client_timeout = opts.delete(:stale_client_timeout) @redis = opts.delete(:redis) || Redis.new(opts) @use_local_time = opts.delete(:use_local_time) @tokens = [] end
Public Instance Methods
all_tokens()
click to toggle source
# File lib/redis/semaphore.rb, line 120 def all_tokens @redis.multi do @redis.lrange(available_key, 0, -1) @redis.hkeys(grabbed_key) end.flatten end
available_count()
click to toggle source
# File lib/redis/semaphore.rb, line 44 def available_count if exists? @redis.llen(available_key) else @resource_count end end
delete!()
click to toggle source
# File lib/redis/semaphore.rb, line 52 def delete! @redis.del(available_key) @redis.del(grabbed_key) @redis.del(exists_key) @redis.del(version_key) end
exists?()
click to toggle source
# File lib/redis/semaphore.rb, line 116 def exists? @redis.exists(exists_key) end
exists_or_create!()
click to toggle source
# File lib/redis/semaphore.rb, line 27 def exists_or_create! token = @redis.getset(exists_key, EXISTS_TOKEN) if token.nil? create! else # Previous versions of redis-semaphore did not set `version_key`. # Make sure it's set now, so we can use it in future versions. if token == API_VERSION && @redis.get(version_key).nil? @redis.set(version_key, API_VERSION) end true end end
generate_unique_token()
click to toggle source
# File lib/redis/semaphore.rb, line 127 def generate_unique_token tokens = all_tokens token = Random.rand.to_s while(tokens.include? token) token = Random.rand.to_s end end
lock(timeout = nil) { |current_token| ... }
click to toggle source
# File lib/redis/semaphore.rb, line 59 def lock(timeout = nil) exists_or_create! release_stale_locks! if check_staleness? if timeout.nil? || timeout > 0 # passing timeout 0 to blpop causes it to block _key, current_token = @redis.blpop(available_key, timeout || 0) else current_token = @redis.lpop(available_key) end return false if current_token.nil? @tokens.push(current_token) @redis.hset(grabbed_key, current_token, current_time.to_f) return_value = current_token if block_given? begin return_value = yield current_token ensure signal(current_token) end end return_value end
Also aliased as: wait
locked?(token = nil)
click to toggle source
# File lib/redis/semaphore.rb, line 93 def locked?(token = nil) if token @redis.hexists(grabbed_key, token) else @tokens.each do |token| return true if locked?(token) end false end end
release_stale_locks!()
click to toggle source
# File lib/redis/semaphore.rb, line 136 def release_stale_locks! simple_expiring_mutex(:release_locks, 10) do @redis.hgetall(grabbed_key).each do |token, locked_at| timed_out_at = locked_at.to_f + @stale_client_timeout if timed_out_at < current_time.to_f signal(token) end end end end
signal(token = 1)
click to toggle source
# File lib/redis/semaphore.rb, line 105 def signal(token = 1) token ||= generate_unique_token @redis.multi do @redis.hdel grabbed_key, token @redis.lpush available_key, token set_expiration_if_necessary end end
unlock()
click to toggle source
# File lib/redis/semaphore.rb, line 88 def unlock return false unless locked? signal(@tokens.pop)[1] end
Private Instance Methods
available_key()
click to toggle source
# File lib/redis/semaphore.rb, line 224 def available_key @available_key ||= namespaced_key('AVAILABLE') end
check_staleness?()
click to toggle source
# File lib/redis/semaphore.rb, line 208 def check_staleness? !@stale_client_timeout.nil? end
create!()
click to toggle source
# File lib/redis/semaphore.rb, line 184 def create! @redis.expire(exists_key, 10) @redis.multi do @redis.del(grabbed_key) @redis.del(available_key) @resource_count.times do |index| @redis.rpush(available_key, index) end @redis.set(version_key, API_VERSION) @redis.persist(exists_key) set_expiration_if_necessary end end
current_time()
click to toggle source
# File lib/redis/semaphore.rb, line 240 def current_time if @use_local_time Time.now else begin instant = redis_namespace? ? @redis.redis.time : @redis.time Time.at(instant[0], instant[1]) rescue @use_local_time = true current_time end end end
exists_key()
click to toggle source
# File lib/redis/semaphore.rb, line 228 def exists_key @exists_key ||= namespaced_key('EXISTS') end
grabbed_key()
click to toggle source
# File lib/redis/semaphore.rb, line 232 def grabbed_key @grabbed_key ||= namespaced_key('GRABBED') end
namespaced_key(variable)
click to toggle source
# File lib/redis/semaphore.rb, line 216 def namespaced_key(variable) if redis_namespace? "#{@name}:#{variable}" else "SEMAPHORE:#{@name}:#{variable}" end end
redis_namespace?()
click to toggle source
# File lib/redis/semaphore.rb, line 212 def redis_namespace? (defined?(Redis::Namespace) && @redis.is_a?(Redis::Namespace)) end
set_expiration_if_necessary()
click to toggle source
# File lib/redis/semaphore.rb, line 200 def set_expiration_if_necessary if @expiration [available_key, exists_key, version_key].each do |key| @redis.expire(key, @expiration) end end end
simple_expiring_mutex(key_name, expires_in) { || ... }
click to toggle source
# File lib/redis/semaphore.rb, line 150 def simple_expiring_mutex(key_name, expires_in) # Using the locking mechanism as described in # http://redis.io/commands/setnx key_name = namespaced_key(key_name) cached_current_time = current_time.to_f my_lock_expires_at = cached_current_time + expires_in + 1 got_lock = @redis.setnx(key_name, my_lock_expires_at) if !got_lock # Check if expired other_lock_expires_at = @redis.get(key_name).to_f if other_lock_expires_at < cached_current_time old_expires_at = @redis.getset(key_name, my_lock_expires_at).to_f # Check if another client started cleanup yet. If not, # then we now have the lock. got_lock = (old_expires_at == other_lock_expires_at) end end return false if !got_lock begin yield ensure # Make sure not to delete the lock in case someone else already expired # our lock, with one second in between to account for some lag. @redis.del(key_name) if my_lock_expires_at > (current_time.to_f - 1) end end
version_key()
click to toggle source
# File lib/redis/semaphore.rb, line 236 def version_key @version_key ||= namespaced_key('VERSION') end