class RedisUniqueQueue

Constants

VERSION

Attributes

name[R]

Public Class Methods

new(name, redis_or_options = {}, more_options = {}) click to toggle source
# File lib/redis_unique_queue.rb, line 11
def initialize(name, redis_or_options = {}, more_options = {})
        name = name.to_s if name.kind_of? Symbol

        raise InvalidNameException.new unless name.kind_of?(String) && name.size > 0
        @name = name
        @redis = if redis_or_options.kind_of?(Redis)
                         redis_or_options
                       elsif redis_or_options.kind_of? Hash
                               ::Redis.new redis_or_options
                       elsif defined?(ActiveSupport::Cache::RedisStore) && redis_or_options.kind_of?(ActiveSupport::Cache::RedisStore)
                               @pooled = redis_or_options.data.kind_of?(ConnectionPool)
                               redis_or_options.data
                       elsif defined?(ConnectionPool) && redis_or_options.kind_of?(ConnectionPool)
                               @pooled = true
                               redis_or_options
                       else
                               raise InvalidRedisConfigException.new
                 end

        if more_options.kind_of?(Hash) && more_options[:expire]
                expire more_options[:expire]
        end
end

Public Instance Methods

all() click to toggle source
# File lib/redis_unique_queue.rb, line 77
def all
        peek 0, size
end
back() click to toggle source
# File lib/redis_unique_queue.rb, line 61
def back
        with { |redis| redis.zrevrange(name, 0, 0).first }
end
clear() click to toggle source
# File lib/redis_unique_queue.rb, line 93
def clear
        with { |redis| redis.del name }
        []
end
expire(seconds) click to toggle source
# File lib/redis_unique_queue.rb, line 98
def expire seconds
        with { |redis| redis.expire name, seconds }
end
front() click to toggle source
# File lib/redis_unique_queue.rb, line 57
def front
        with { |redis| redis.zrange(name, 0, 0).first }
end
include?(data) click to toggle source
# File lib/redis_unique_queue.rb, line 89
def include? data
        !with { |redis| redis.zscore(name, data).nil? }
end
peek(index, amount = 1) click to toggle source
# File lib/redis_unique_queue.rb, line 81
def peek index, amount = 1
        with { |redis| redis.zrange name, index, index + amount - 1 }
end
peek_reverse(index, amount = 1) click to toggle source
# File lib/redis_unique_queue.rb, line 85
def peek_reverse index, amount = 1
        with { |redis| redis.zrevrange name, index, index + amount - 1 }
end
pop() click to toggle source
# File lib/redis_unique_queue.rb, line 45
def pop
        block_on_atomic_attempt { attempt_atomic_pop }
end
pop_all() click to toggle source
# File lib/redis_unique_queue.rb, line 49
def pop_all
        block_on_atomic_attempt { attempt_atomic_pop_all }
end
pop_multi(amount) click to toggle source
# File lib/redis_unique_queue.rb, line 53
def pop_multi amount
        block_on_atomic_attempt { attempt_atomic_pop_multi amount }
end
push(data) click to toggle source
# File lib/redis_unique_queue.rb, line 35
def push data
        [block_on_atomic_attempt { attempt_atomic_push_multi(data) }].flatten.first
end
push_multi(*values) click to toggle source
# File lib/redis_unique_queue.rb, line 39
def push_multi *values
        if values.size > 0
                block_on_atomic_attempt { attempt_atomic_push_multi(*values) }
        end
end
remove(data) click to toggle source
# File lib/redis_unique_queue.rb, line 65
def remove data
        with { |redis| redis.zrem name, data }
end
remove_item_by_index(index) click to toggle source
# File lib/redis_unique_queue.rb, line 69
def remove_item_by_index index
        with { |redis| redis.zremrangebyrank name, index, index }
end
size() click to toggle source
# File lib/redis_unique_queue.rb, line 73
def size
        with { |redis| redis.zcard name }
end

Private Instance Methods

attempt_atomic_pop() click to toggle source
# File lib/redis_unique_queue.rb, line 134
def attempt_atomic_pop
        read = lambda do
                with{|redis| redis.zrangebyscore(name, 0, max_score, :with_scores => false, :limit => [0, 1]).first}
        end

        write = lambda do |multi, read_result|
                multi.zrem name, read_result
        end

        attempt_atomic_read_write read, write
end
attempt_atomic_pop_all() click to toggle source
# File lib/redis_unique_queue.rb, line 130
def attempt_atomic_pop_all
        attempt_atomic_read_write lambda { all }, lambda { |multi, read_result| multi.del name }
end
attempt_atomic_pop_multi(amount) click to toggle source
# File lib/redis_unique_queue.rb, line 126
def attempt_atomic_pop_multi amount
        attempt_atomic_read_write lambda { peek 0, amount }, lambda { |multi, read_result| multi.zremrangebyrank name, 0, amount - 1 }
end
attempt_atomic_push_multi(*values) click to toggle source
# File lib/redis_unique_queue.rb, line 108
def attempt_atomic_push_multi *values
        with do |redis|
                success = redis.watch(name) do
                        score = [Time.now.to_f, max_score].compact.max
                        values = values.first if 1 == values.size && values.first.kind_of?(Array)
                        scored_values = []
                        values.each_with_index do |value, i|
                                scored_values << [score + i, value]
                        end
                        redis.multi do |multi|
                                multi.zadd name, scored_values
                        end
                end

                [success, values]
        end
end
attempt_atomic_read_write(read_op, write_op) click to toggle source
# File lib/redis_unique_queue.rb, line 153
def attempt_atomic_read_write read_op, write_op
        result = nil
        success = with do |redis|
                redis.watch(name) do
                        result = read_op.call
                        if result
                                redis.multi do |multi|
                                        write_op.call multi, result
                                end
                        end
                end
        end

        [success, result]
end
block_on_atomic_attempt() { || ... } click to toggle source
# File lib/redis_unique_queue.rb, line 146
def block_on_atomic_attempt
        begin
                success, result = yield
        end while !success && result
        result
end
max_score() click to toggle source
# File lib/redis_unique_queue.rb, line 104
def max_score
        with { |redis| redis.zscore name, back }
end
pooled?() click to toggle source
# File lib/redis_unique_queue.rb, line 179
def pooled?
        !!@pooled
end
with(&block) click to toggle source
# File lib/redis_unique_queue.rb, line 171
def with(&block)
        if pooled?
                @redis.with(&block)
        else
                block.call(@redis)
        end
end