class Redis::Queue

Constants

VERSION

Public Class Methods

new(queue_name, process_queue_name, options = {}) click to toggle source
# File lib/redis/queue.rb, line 11
def initialize(queue_name, process_queue_name, options = {})
  raise ArgumentError, 'First argument must be a non empty string'  if !queue_name.is_a?(String) || queue_name.empty?
  raise ArgumentError, 'Second argument must be a non empty string' if !process_queue_name.is_a?(String) || process_queue_name.empty?
  raise ArgumentError, 'Queue and Process queue have the same name' if process_queue_name == queue_name

  @redis = options[:redis] || Redis.current
  @queue_name = queue_name
  @process_queue_name = process_queue_name
  @last_message = nil
  @timeout = options[:timeout] ||= 0
end
version() click to toggle source
# File lib/redis/queue.rb, line 7
def self.version
  "redis-queue version #{VERSION}"
end

Public Instance Methods

<<(obj)
Alias for: push
clear(clear_process_queue = false) click to toggle source
# File lib/redis/queue.rb, line 27
def clear(clear_process_queue = false)
  @redis.del @queue_name
  @redis.del @process_queue_name if clear_process_queue
end
commit() click to toggle source
# File lib/redis/queue.rb, line 49
def commit
  @redis.lrem(@process_queue_name, 0, @last_message)
end
dec(non_block = false)
Alias for: pop
empty?() click to toggle source
# File lib/redis/queue.rb, line 32
def empty?
  length <= 0
end
enc(obj)
Alias for: push
length() click to toggle source
# File lib/redis/queue.rb, line 23
def length
  @redis.llen @queue_name
end
Also aliased as: size
pop(non_block = false) click to toggle source
# File lib/redis/queue.rb, line 40
def pop(non_block = false)
  @last_message = if non_block
                    @redis.rpoplpush(@queue_name, @process_queue_name)
                  else
                    @redis.brpoplpush(@queue_name, @process_queue_name, @timeout)
                  end
  @last_message
end
Also aliased as: dec, shift
process(non_block = false, timeout = nil) { |message| ... } click to toggle source
# File lib/redis/queue.rb, line 53
def process(non_block = false, timeout = nil)
  @timeout = timeout unless timeout.nil?
  loop do
    message = pop(non_block)
    ret = yield message if block_given?
    commit if ret
    break if message.nil? || (non_block && empty?)
  end
end
push(obj) click to toggle source
# File lib/redis/queue.rb, line 36
def push(obj)
  @redis.lpush(@queue_name, obj)
end
Also aliased as: enc, <<
refill() click to toggle source
# File lib/redis/queue.rb, line 63
def refill
  while (message = @redis.lpop(@process_queue_name))
    @redis.rpush(@queue_name, message)
  end
  true
end
shift(non_block = false)
Alias for: pop
size()
Alias for: length