class Redis::Queue
Constants
- VERSION
Public Class Methods
new(queue_name, process_queue_name, options = {})
click to toggle source
# File lib/redis/queue.rb, line 11 def initialize(queue_name, process_queue_name, options = {}) raise ArgumentError, 'First argument must be a non empty string' if !queue_name.is_a?(String) || queue_name.empty? raise ArgumentError, 'Second argument must be a non empty string' if !process_queue_name.is_a?(String) || process_queue_name.empty? raise ArgumentError, 'Queue and Process queue have the same name' if process_queue_name == queue_name @redis = options[:redis] || Redis.current @queue_name = queue_name @process_queue_name = process_queue_name @last_message = nil @timeout = options[:timeout] ||= 0 end
version()
click to toggle source
# File lib/redis/queue.rb, line 7 def self.version "redis-queue version #{VERSION}" end
Public Instance Methods
clear(clear_process_queue = false)
click to toggle source
# File lib/redis/queue.rb, line 27 def clear(clear_process_queue = false) @redis.del @queue_name @redis.del @process_queue_name if clear_process_queue end
commit()
click to toggle source
# File lib/redis/queue.rb, line 49 def commit @redis.lrem(@process_queue_name, 0, @last_message) end
empty?()
click to toggle source
# File lib/redis/queue.rb, line 32 def empty? length <= 0 end
length()
click to toggle source
# File lib/redis/queue.rb, line 23 def length @redis.llen @queue_name end
Also aliased as: size
pop(non_block = false)
click to toggle source
# File lib/redis/queue.rb, line 40 def pop(non_block = false) @last_message = if non_block @redis.rpoplpush(@queue_name, @process_queue_name) else @redis.brpoplpush(@queue_name, @process_queue_name, @timeout) end @last_message end
process(non_block = false, timeout = nil) { |message| ... }
click to toggle source
# File lib/redis/queue.rb, line 53 def process(non_block = false, timeout = nil) @timeout = timeout unless timeout.nil? loop do message = pop(non_block) ret = yield message if block_given? commit if ret break if message.nil? || (non_block && empty?) end end
push(obj)
click to toggle source
# File lib/redis/queue.rb, line 36 def push(obj) @redis.lpush(@queue_name, obj) end
refill()
click to toggle source
# File lib/redis/queue.rb, line 63 def refill while (message = @redis.lpop(@process_queue_name)) @redis.rpush(@queue_name, message) end true end