class DelayQueue

Constants

LOCK_DURATION

Public Class Methods

new(redis, queue_name) click to toggle source
# File lib/delay_queue.rb, line 3
def initialize(redis, queue_name)
  @redis = redis
  @queue_name = queue_name
  @lock_name = 'lock.' + @queue_name
end

Public Instance Methods

delete(item) click to toggle source
# File lib/delay_queue.rb, line 13
def delete(item)
  @redis.zrem(@queue_name, item)
end
delete_all!() click to toggle source
# File lib/delay_queue.rb, line 17
def delete_all!
  @redis.del(@queue_name)
end
dequeue() click to toggle source
# File lib/delay_queue.rb, line 45
def dequeue
  if acquire_lock || break_lock
    array = @redis.zrangebyscore(@queue_name, 0, Time.now.to_i, :limit => [0, 1])
    item = array.first if array
    @redis.zrem(@queue_name, item) if item
    release_lock
    item
  else # couldn't acquire or break the lock. wait and try again
    # a small sleep value is actually faster than no sleep value, presumably because no
    # delay puts too much stress on Redis
    sleep 0.01
    dequeue
  end
end
empty?() click to toggle source
# File lib/delay_queue.rb, line 21
def empty?
  size == 0
end
enqueue(item, options={ :delay => 0}) click to toggle source

Enqueue a unique item with an optional delay

:item

A string

:options

An optional hash of one of the following options

:until

Time before which not to allow this item to be dequeued

:delay

Number of seconds to wait before allowing this to be dequeued

# File lib/delay_queue.rb, line 36
def enqueue(item, options={ :delay => 0})
  if options[:delay]
    time = Time.now + options[:delay]
  elsif options[:until]
    time = options[:until]
  end
  @redis.zadd(@queue_name, time.to_i, item)
end
include?(item) click to toggle source
# File lib/delay_queue.rb, line 9
def include?(item)
  @redis.zscore(@queue_name, item)
end
size() click to toggle source
# File lib/delay_queue.rb, line 25
def size
  @redis.zcard(@queue_name)
end

Private Instance Methods

new_lock_expiration() click to toggle source
# File lib/delay_queue.rb, line 77
def new_lock_expiration
  (Time.now + LOCK_DURATION).to_i
end