class ChunkedReliableQueue

Constants

DEFAULT_SIZE

Attributes

name[R]
queue[R]
redis[R]
size[R]
working_queue[R]

Public Class Methods

new(name, queue, redis) click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 6
def initialize(name, queue, redis)
  @name = name
  @queue = queue
  @redis = redis
  @working_queue = "#{queue}.working_on.#{name}"

  requeue_unfinished_work
end

Public Instance Methods

each_slice(size = DEFAULT_SIZE) { |replies| ... } click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 15
def each_slice(size = DEFAULT_SIZE)
  return enum_for(:each_slice, size) unless block_given?

  loop do
    blocking_reply = redis.brpoplpush(queue, working_queue, 30)
    next unless blocking_reply

    replies = [blocking_reply]
    replies += redis.multi { |multi|
      (size - 1).times do
        multi.rpoplpush(queue, working_queue)
      end
    }.compact

    yield replies
    redis.multi do |multi|
      replies.each do |reply|
        multi.lrem(working_queue, 0, reply)
      end
    end
  end
end

Private Instance Methods

requeue_unfinished_work() click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 40
def requeue_unfinished_work
  loop while redis.rpoplpush(working_queue, queue)
end