class DelayQueue
Constants
- LOCK_DURATION
Public Class Methods
new(redis, queue_name)
click to toggle source
# File lib/delay_queue.rb, line 3 def initialize(redis, queue_name) @redis = redis @queue_name = queue_name @lock_name = 'lock.' + @queue_name end
Public Instance Methods
delete(item)
click to toggle source
# File lib/delay_queue.rb, line 13 def delete(item) @redis.zrem(@queue_name, item) end
delete_all!()
click to toggle source
# File lib/delay_queue.rb, line 17 def delete_all! @redis.del(@queue_name) end
dequeue()
click to toggle source
# File lib/delay_queue.rb, line 45 def dequeue if acquire_lock || break_lock array = @redis.zrangebyscore(@queue_name, 0, Time.now.to_i, :limit => [0, 1]) item = array.first if array @redis.zrem(@queue_name, item) if item release_lock item else # couldn't acquire or break the lock. wait and try again # a small sleep value is actually faster than no sleep value, presumably because no # delay puts too much stress on Redis sleep 0.01 dequeue end end
empty?()
click to toggle source
# File lib/delay_queue.rb, line 21 def empty? size == 0 end
enqueue(item, options={ :delay => 0})
click to toggle source
Enqueue a unique item with an optional delay
:item
-
A string
:options
-
An optional hash of one of the following options
:until
-
Time before which not to allow this item to be dequeued
:delay
-
Number of seconds to wait before allowing this to be dequeued
# File lib/delay_queue.rb, line 36 def enqueue(item, options={ :delay => 0}) if options[:delay] time = Time.now + options[:delay] elsif options[:until] time = options[:until] end @redis.zadd(@queue_name, time.to_i, item) end
include?(item)
click to toggle source
# File lib/delay_queue.rb, line 9 def include?(item) @redis.zscore(@queue_name, item) end
size()
click to toggle source
# File lib/delay_queue.rb, line 25 def size @redis.zcard(@queue_name) end
Private Instance Methods
new_lock_expiration()
click to toggle source
# File lib/delay_queue.rb, line 77 def new_lock_expiration (Time.now + LOCK_DURATION).to_i end