class ChunkedReliableQueue
Constants
- DEFAULT_SIZE
Attributes
name[R]
queue[R]
redis[R]
size[R]
working_queue[R]
Public Class Methods
new(name, queue, redis)
click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 6 def initialize(name, queue, redis) @name = name @queue = queue @redis = redis @working_queue = "#{queue}.working_on.#{name}" requeue_unfinished_work end
Public Instance Methods
each_slice(size = DEFAULT_SIZE) { |replies| ... }
click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 15 def each_slice(size = DEFAULT_SIZE) return enum_for(:each_slice, size) unless block_given? loop do blocking_reply = redis.brpoplpush(queue, working_queue, 30) next unless blocking_reply replies = [blocking_reply] replies += redis.multi { |multi| (size - 1).times do multi.rpoplpush(queue, working_queue) end }.compact yield replies redis.multi do |multi| replies.each do |reply| multi.lrem(working_queue, 0, reply) end end end end
Private Instance Methods
requeue_unfinished_work()
click to toggle source
# File lib/reliable_queue_rb/chunked_reliable_queue.rb, line 40 def requeue_unfinished_work loop while redis.rpoplpush(working_queue, queue) end