class ReliableQueue

Attributes

queue[R]
redis[R]
working_queue[R]

Public Class Methods

new(queue, redis) click to toggle source
# File lib/reliable_queue_rb/reliable_queue.rb, line 6
def initialize(queue, redis)
  @queue = queue
  @redis = redis
  @working_queue = "#{queue}.working_on"

  requeue_unfinished_work
end

Public Instance Methods

each() { |reply| ... } click to toggle source
# File lib/reliable_queue_rb/reliable_queue.rb, line 14
def each
  return enum_for(:each) unless block_given?

  loop do
    reply = redis.brpoplpush(queue, working_queue, 30)
    next unless reply

    yield reply
    redis.lrem(working_queue, 0, reply)
  end
end

Private Instance Methods

requeue_unfinished_work() click to toggle source
# File lib/reliable_queue_rb/reliable_queue.rb, line 28
def requeue_unfinished_work
  loop while redis.rpoplpush(working_queue, queue)
end