class RedisUniqueQueue
Constants
- VERSION
Attributes
name[R]
Public Class Methods
new(name, redis_or_options = {}, more_options = {})
click to toggle source
# File lib/redis_unique_queue.rb, line 11 def initialize(name, redis_or_options = {}, more_options = {}) name = name.to_s if name.kind_of? Symbol raise InvalidNameException.new unless name.kind_of?(String) && name.size > 0 @name = name @redis = if redis_or_options.kind_of?(Redis) redis_or_options elsif redis_or_options.kind_of? Hash ::Redis.new redis_or_options elsif defined?(ActiveSupport::Cache::RedisStore) && redis_or_options.kind_of?(ActiveSupport::Cache::RedisStore) @pooled = redis_or_options.data.kind_of?(ConnectionPool) redis_or_options.data elsif defined?(ConnectionPool) && redis_or_options.kind_of?(ConnectionPool) @pooled = true redis_or_options else raise InvalidRedisConfigException.new end if more_options.kind_of?(Hash) && more_options[:expire] expire more_options[:expire] end end
Public Instance Methods
all()
click to toggle source
# File lib/redis_unique_queue.rb, line 77 def all peek 0, size end
back()
click to toggle source
# File lib/redis_unique_queue.rb, line 61 def back with { |redis| redis.zrevrange(name, 0, 0).first } end
clear()
click to toggle source
# File lib/redis_unique_queue.rb, line 93 def clear with { |redis| redis.del name } [] end
expire(seconds)
click to toggle source
# File lib/redis_unique_queue.rb, line 98 def expire seconds with { |redis| redis.expire name, seconds } end
front()
click to toggle source
# File lib/redis_unique_queue.rb, line 57 def front with { |redis| redis.zrange(name, 0, 0).first } end
include?(data)
click to toggle source
# File lib/redis_unique_queue.rb, line 89 def include? data !with { |redis| redis.zscore(name, data).nil? } end
peek(index, amount = 1)
click to toggle source
# File lib/redis_unique_queue.rb, line 81 def peek index, amount = 1 with { |redis| redis.zrange name, index, index + amount - 1 } end
peek_reverse(index, amount = 1)
click to toggle source
# File lib/redis_unique_queue.rb, line 85 def peek_reverse index, amount = 1 with { |redis| redis.zrevrange name, index, index + amount - 1 } end
pop()
click to toggle source
# File lib/redis_unique_queue.rb, line 45 def pop block_on_atomic_attempt { attempt_atomic_pop } end
pop_all()
click to toggle source
# File lib/redis_unique_queue.rb, line 49 def pop_all block_on_atomic_attempt { attempt_atomic_pop_all } end
pop_multi(amount)
click to toggle source
# File lib/redis_unique_queue.rb, line 53 def pop_multi amount block_on_atomic_attempt { attempt_atomic_pop_multi amount } end
push(data)
click to toggle source
# File lib/redis_unique_queue.rb, line 35 def push data [block_on_atomic_attempt { attempt_atomic_push_multi(data) }].flatten.first end
push_multi(*values)
click to toggle source
# File lib/redis_unique_queue.rb, line 39 def push_multi *values if values.size > 0 block_on_atomic_attempt { attempt_atomic_push_multi(*values) } end end
remove(data)
click to toggle source
# File lib/redis_unique_queue.rb, line 65 def remove data with { |redis| redis.zrem name, data } end
remove_item_by_index(index)
click to toggle source
# File lib/redis_unique_queue.rb, line 69 def remove_item_by_index index with { |redis| redis.zremrangebyrank name, index, index } end
size()
click to toggle source
# File lib/redis_unique_queue.rb, line 73 def size with { |redis| redis.zcard name } end
Private Instance Methods
attempt_atomic_pop()
click to toggle source
# File lib/redis_unique_queue.rb, line 134 def attempt_atomic_pop read = lambda do with{|redis| redis.zrangebyscore(name, 0, max_score, :with_scores => false, :limit => [0, 1]).first} end write = lambda do |multi, read_result| multi.zrem name, read_result end attempt_atomic_read_write read, write end
attempt_atomic_pop_all()
click to toggle source
# File lib/redis_unique_queue.rb, line 130 def attempt_atomic_pop_all attempt_atomic_read_write lambda { all }, lambda { |multi, read_result| multi.del name } end
attempt_atomic_pop_multi(amount)
click to toggle source
# File lib/redis_unique_queue.rb, line 126 def attempt_atomic_pop_multi amount attempt_atomic_read_write lambda { peek 0, amount }, lambda { |multi, read_result| multi.zremrangebyrank name, 0, amount - 1 } end
attempt_atomic_push_multi(*values)
click to toggle source
# File lib/redis_unique_queue.rb, line 108 def attempt_atomic_push_multi *values with do |redis| success = redis.watch(name) do score = [Time.now.to_f, max_score].compact.max values = values.first if 1 == values.size && values.first.kind_of?(Array) scored_values = [] values.each_with_index do |value, i| scored_values << [score + i, value] end redis.multi do |multi| multi.zadd name, scored_values end end [success, values] end end
attempt_atomic_read_write(read_op, write_op)
click to toggle source
# File lib/redis_unique_queue.rb, line 153 def attempt_atomic_read_write read_op, write_op result = nil success = with do |redis| redis.watch(name) do result = read_op.call if result redis.multi do |multi| write_op.call multi, result end end end end [success, result] end
block_on_atomic_attempt() { || ... }
click to toggle source
# File lib/redis_unique_queue.rb, line 146 def block_on_atomic_attempt begin success, result = yield end while !success && result result end
max_score()
click to toggle source
# File lib/redis_unique_queue.rb, line 104 def max_score with { |redis| redis.zscore name, back } end
pooled?()
click to toggle source
# File lib/redis_unique_queue.rb, line 179 def pooled? !!@pooled end
with(&block)
click to toggle source
# File lib/redis_unique_queue.rb, line 171 def with(&block) if pooled? @redis.with(&block) else block.call(@redis) end end