class ParallelQueue

Constants

LOCK_DURATION

Public Class Methods

new(redis, queue_name, options = {}) click to toggle source
# File lib/parallel_queue.rb, line 3
def initialize(redis, queue_name, options = {})
  @redis = redis
  @queue_name = queue_name
  @maxlength = options[:maxlength] || nil
  @lock_name = 'lock.' + @queue_name
  @current_queue_index = 0
end

Public Instance Methods

delete_all!() click to toggle source
# File lib/parallel_queue.rb, line 84
def delete_all!
  if acquire_lock || break_lock
    while !empty?
      delete_queue(@redis.lpop(list_of_queue_names))
    end
    @redis.del(list_of_queue_names)
    self.current_queue_index = 0
    release_lock
  else # couldn't acquire or break the lock. wait and try again
    # a small sleep value is actually faster than no sleep value, presumably because no
    # delay puts too much stress on Redis
    sleep 0.01
    delete_all!
  end
end
delete_queue(id) click to toggle source
# File lib/parallel_queue.rb, line 12
def delete_queue(id)
  @redis.lrem(list_of_queue_names, 1, id)
  @redis.del(queue_from_id(id))
end
dequeue() click to toggle source
# File lib/parallel_queue.rb, line 35
def dequeue
  if acquire_lock || break_lock
    current_id = @redis.lindex(list_of_queue_names, current_queue_index)

    # pop from the current queue
    current_queue = queue_from_id(current_id)
    item = @redis.lpop(current_queue)
    delete_queue(current_id) if @redis.llen(current_queue) == 0

    increment_current_queue_index
    release_lock
    item
  else # couldn't acquire or break the lock. wait and try again
    # A small sleep value is actually faster than no sleep value, presumably because no
    # delay puts too much stress on Redis
    # Experiment:
    # started dequeue_demo.rb in two terminals
    # started enqueue_demo.rb in a third terminal
    # enqueue_demo.rb is set to run until it enqueues 40,000 times
    # dequeue_demo.rb times its run, starting from the first time there is data to
    # dequeue, and running through until all data have been dequeued
    # times reported are the average of both dequeue terminals (which consitently ended
    # within 0.1 second of one another)
    # Sleep Delay            Run Duration (in seconds)
    # 0.01                   22.6
    # 0.001                  22.3
    # no sleep               25.1
    # 0.001                  23.8
    # 0.001                  23.8
    # 0.01                   22.6
    # 0.01                   22.7
    # no sleep               25.1
    # no sleep               25.0
    #
    sleep 0.01
    dequeue
  end
end
dequeue_each() { |item| ... } click to toggle source
# File lib/parallel_queue.rb, line 74
def dequeue_each(&block)
  return if queue_count == 0
  self.current_queue_index = 0

  begin
    item = dequeue
    yield(item) unless item.nil?
  end while current_queue_index > 0 && current_queue_index < queue_count
end
empty?() click to toggle source
# File lib/parallel_queue.rb, line 17
def empty?
  queue_count == 0
end
enqueue(id, item) click to toggle source
:item

A string

# File lib/parallel_queue.rb, line 27
def enqueue(id, item)
  queue = queue_from_id(id)
  @redis.rpush(queue, item)
  @redis.ltrim(queue, -@maxlength, - 1) if @maxlength
  @redis.lrem(list_of_queue_names, 1, id)
  @redis.rpush(list_of_queue_names, id)
end
queue_count() click to toggle source
# File lib/parallel_queue.rb, line 21
def queue_count
  @redis.llen(list_of_queue_names)
end

Protected Instance Methods

increment_current_queue_index() click to toggle source
# File lib/parallel_queue.rb, line 116
def increment_current_queue_index
  self.current_queue_index = current_queue_index + 1
end

Private Instance Methods

list_of_queue_names() click to toggle source
# File lib/parallel_queue.rb, line 134
def list_of_queue_names
  "#{@queue_name}_qs"
end
new_lock_expiration() click to toggle source
# File lib/parallel_queue.rb, line 145
def new_lock_expiration
  (Time.now + LOCK_DURATION).to_i
end
queue_from_id(id) click to toggle source
# File lib/parallel_queue.rb, line 138
def queue_from_id(id)
  "#{@queue_name}_q_#{id}"
end