class Reliable::Queue
Constants
- FatalError
Attributes
base_key[RW]
uuid[RW]
Public Class Methods
new(name)
click to toggle source
# File lib/reliable/queue.rb, line 15 def initialize(name) @name = name @base_key = "reliable:queues:#{name}" @redis = Redis.new @pending_key = @base_key + ":pending" @failed_key = @base_key + ":failed" end
Public Instance Methods
create_worker(&work)
click to toggle source
# File lib/reliable/queue.rb, line 42 def create_worker(&work) Worker.new(self, &work) end
current_time()
click to toggle source
# File lib/reliable/queue.rb, line 23 def current_time @redis.time end
each(&block)
click to toggle source
# File lib/reliable/queue.rb, line 61 def each(&block) if block_given? # This is because we confuse iteration with work here # So we need to front-load the work, then fake the iteration to_enum(&block).each { |item| item } else to_enum end end
failed()
click to toggle source
# File lib/reliable/queue.rb, line 38 def failed @failed ||= List.new(@failed_key, @redis) end
logger()
click to toggle source
# File lib/reliable/queue.rb, line 100 def logger Reliable.logger end
notify(e, other = {})
click to toggle source
# File lib/reliable/queue.rb, line 104 def notify(e, other = {}) # TODO: make configurable logger.info e.inspect logger.info e.backtrace logger.info other.inspect end
peach(opts = {}, &block)
click to toggle source
# File lib/reliable/queue.rb, line 71 def peach(opts = {}, &block) raise "must supply a block" unless block_given? concurrency = opts.fetch(:concurrency) threads = concurrency.times.map do Thread.new { each(&block) } end threads.map { |t| t.abort_on_exception = true } threads.map(&:join) end
pending()
click to toggle source
# File lib/reliable/queue.rb, line 34 def pending @pending ||= List.new(@pending_key, @redis) end
push(value)
click to toggle source
# File lib/reliable/queue.rb, line 27 def push(value) UUID.new(current_time) do |uuid| @redis.set_and_lpush(pending.key, uuid.to_s, value) end end
Also aliased as: <<
take(number, &block)
click to toggle source
# File lib/reliable/queue.rb, line 57 def take(number, &block) to_enum(&block).take(number) end
to_enum(&work)
click to toggle source
# File lib/reliable/queue.rb, line 46 def to_enum(&work) work ||= ->(item) { item } worker = create_worker(&work) Enumerator.new do |y| loop do # forever result = worker.next # do work y.yield result # then release control end end end
total_items()
click to toggle source
# File lib/reliable/queue.rb, line 96 def total_items @redis.scan("reliable:items:*").length end
total_processing()
click to toggle source
# File lib/reliable/queue.rb, line 84 def total_processing keys = @redis.scan "reliable:queues:*:workers:*:processing" lengths = @redis.pipeline do |pipe| keys.each do |key| pipe.llen key end end lengths.map(&:to_i).reduce(0, &:+) end