class Redis::Deque
Constants
- VERSION
Public Class Methods
new(queue_name, options = {})
click to toggle source
# File lib/redis/deque.rb, line 11 def initialize(queue_name, options = {}) raise ArgumentError, 'queue_name must be a non-empty string' if !queue_name.is_a?(String) || queue_name.empty? raise ArgumentError, 'process_queue_name must be a non-empty string' if options.key?(:process_queue_name) && (!options[:process_queue_name].is_a?(String) || options[:process_queue_name].empty?) raise ArgumentError, 'queue_name and process_queue_name must be different' if options[:process_queue_name] == queue_name @redis = options[:redis] || Redis.current @queue_name = queue_name @process_queue_name = options[:process_queue_name] || "#{queue_name}_process" @last_message = nil @timeout = options[:timeout] ||= 0 end
version()
click to toggle source
# File lib/redis/deque.rb, line 7 def self.version "redis-deque version #{VERSION}" end
Public Instance Methods
clear(clear_process_queue = false)
click to toggle source
# File lib/redis/deque.rb, line 27 def clear(clear_process_queue = false) @redis.del @queue_name @redis.del @process_queue_name if clear_process_queue end
commit(message)
click to toggle source
# File lib/redis/deque.rb, line 53 def commit(message) @redis.lrem(@process_queue_name, 0, message) end
commit_all()
click to toggle source
# File lib/redis/deque.rb, line 61 def commit_all @redis.del @process_queue_name end
commit_last()
click to toggle source
# File lib/redis/deque.rb, line 57 def commit_last commit @last_message end
empty?()
click to toggle source
# File lib/redis/deque.rb, line 32 def empty? length <= 0 end
length()
click to toggle source
# File lib/redis/deque.rb, line 23 def length @redis.llen @queue_name end
Also aliased as: size
pop(non_block = false)
click to toggle source
# File lib/redis/deque.rb, line 44 def pop(non_block = false) @last_message = if non_block @redis.rpoplpush(@queue_name, @process_queue_name) else @redis.brpoplpush(@queue_name, @process_queue_name, @timeout) end @last_message end
process(non_block = false, timeout = nil) { |message| ... }
click to toggle source
# File lib/redis/deque.rb, line 65 def process(non_block = false, timeout = nil) @timeout = timeout unless timeout.nil? loop do message = pop(non_block) ret = yield message if block_given? commit_last if ret break if message.nil? || (non_block && empty?) end end
push(obj)
click to toggle source
# File lib/redis/deque.rb, line 36 def push(obj) @redis.lpush(@queue_name, obj) end
refill()
click to toggle source
# File lib/redis/deque.rb, line 75 def refill while (message = @redis.lpop(@process_queue_name)) unshift(message) end true end
unshift(obj)
click to toggle source
# File lib/redis/deque.rb, line 40 def unshift(obj) @redis.rpush(@queue_name, obj) end