class Redis::Deque

Constants

VERSION

Public Class Methods

new(queue_name, options = {}) click to toggle source
# File lib/redis/deque.rb, line 11
def initialize(queue_name, options = {})
  raise ArgumentError, 'queue_name must be a non-empty string'  if !queue_name.is_a?(String) || queue_name.empty?
  raise ArgumentError, 'process_queue_name must be a non-empty string' if options.key?(:process_queue_name) && (!options[:process_queue_name].is_a?(String) || options[:process_queue_name].empty?)
  raise ArgumentError, 'queue_name and process_queue_name must be different' if options[:process_queue_name] == queue_name

  @redis = options[:redis] || Redis.current
  @queue_name = queue_name
  @process_queue_name = options[:process_queue_name] || "#{queue_name}_process"
  @last_message = nil
  @timeout = options[:timeout] ||= 0
end
version() click to toggle source
# File lib/redis/deque.rb, line 7
def self.version
  "redis-deque version #{VERSION}"
end

Public Instance Methods

<<(obj)
Alias for: push
clear(clear_process_queue = false) click to toggle source
# File lib/redis/deque.rb, line 27
def clear(clear_process_queue = false)
  @redis.del @queue_name
  @redis.del @process_queue_name if clear_process_queue
end
commit(message) click to toggle source
# File lib/redis/deque.rb, line 53
def commit(message)
  @redis.lrem(@process_queue_name, 0, message)
end
commit_all() click to toggle source
# File lib/redis/deque.rb, line 61
def commit_all
  @redis.del @process_queue_name
end
commit_last() click to toggle source
# File lib/redis/deque.rb, line 57
def commit_last
  commit @last_message
end
dec(non_block = false)
Alias for: pop
empty?() click to toggle source
# File lib/redis/deque.rb, line 32
def empty?
  length <= 0
end
enc(obj)
Alias for: push
length() click to toggle source
# File lib/redis/deque.rb, line 23
def length
  @redis.llen @queue_name
end
Also aliased as: size
pop(non_block = false) click to toggle source
# File lib/redis/deque.rb, line 44
def pop(non_block = false)
  @last_message = if non_block
                    @redis.rpoplpush(@queue_name, @process_queue_name)
                  else
                    @redis.brpoplpush(@queue_name, @process_queue_name, @timeout)
                  end
  @last_message
end
Also aliased as: dec, shift
process(non_block = false, timeout = nil) { |message| ... } click to toggle source
# File lib/redis/deque.rb, line 65
def process(non_block = false, timeout = nil)
  @timeout = timeout unless timeout.nil?
  loop do
    message = pop(non_block)
    ret = yield message if block_given?
    commit_last if ret
    break if message.nil? || (non_block && empty?)
  end
end
push(obj) click to toggle source
# File lib/redis/deque.rb, line 36
def push(obj)
  @redis.lpush(@queue_name, obj)
end
Also aliased as: enc, <<
refill() click to toggle source
# File lib/redis/deque.rb, line 75
def refill
  while (message = @redis.lpop(@process_queue_name))
    unshift(message)
  end
  true
end
shift(non_block = false)
Alias for: pop
size()
Alias for: length
unshift(obj) click to toggle source
# File lib/redis/deque.rb, line 40
def unshift(obj)
  @redis.rpush(@queue_name, obj)
end