class Reliable::Queue

Constants

FatalError

Attributes

base_key[RW]
uuid[RW]

Public Class Methods

new(name) click to toggle source
# File lib/reliable/queue.rb, line 15
def initialize(name)
  @name = name
  @base_key = "reliable:queues:#{name}"
  @redis = Redis.new
  @pending_key = @base_key + ":pending"
  @failed_key = @base_key + ":failed"
end

Public Instance Methods

<<(value)
Alias for: push
create_worker(&work) click to toggle source
# File lib/reliable/queue.rb, line 42
def create_worker(&work)
  Worker.new(self, &work)
end
current_time() click to toggle source
# File lib/reliable/queue.rb, line 23
def current_time
  @redis.time
end
each(&block) click to toggle source
# File lib/reliable/queue.rb, line 61
def each(&block)
  if block_given?
    # This is because we confuse iteration with work here
    # So we need to front-load the work, then fake the iteration
    to_enum(&block).each { |item| item }
  else
    to_enum
  end
end
failed() click to toggle source
# File lib/reliable/queue.rb, line 38
def failed
  @failed ||= List.new(@failed_key, @redis)
end
logger() click to toggle source
# File lib/reliable/queue.rb, line 100
def logger
  Reliable.logger
end
notify(e, other = {}) click to toggle source
# File lib/reliable/queue.rb, line 104
def notify(e, other = {})
  # TODO: make configurable
  logger.info e.inspect
  logger.info e.backtrace
  logger.info other.inspect
end
peach(opts = {}, &block) click to toggle source
# File lib/reliable/queue.rb, line 71
def peach(opts = {}, &block)
  raise "must supply a block" unless block_given?

  concurrency = opts.fetch(:concurrency)

  threads = concurrency.times.map do
    Thread.new { each(&block) }
  end

  threads.map { |t| t.abort_on_exception = true }
  threads.map(&:join)
end
pending() click to toggle source
# File lib/reliable/queue.rb, line 34
def pending
  @pending ||= List.new(@pending_key, @redis)
end
push(value) click to toggle source
# File lib/reliable/queue.rb, line 27
def push(value)
  UUID.new(current_time) do |uuid|
    @redis.set_and_lpush(pending.key, uuid.to_s, value)
  end
end
Also aliased as: <<
take(number, &block) click to toggle source
# File lib/reliable/queue.rb, line 57
def take(number, &block)
  to_enum(&block).take(number)
end
to_enum(&work) click to toggle source
# File lib/reliable/queue.rb, line 46
def to_enum(&work)
  work ||= ->(item) { item }
  worker = create_worker(&work)
  Enumerator.new do |y|
    loop do                   # forever
      result = worker.next    # do work
      y.yield result          # then release control
    end
  end
end
total_items() click to toggle source
# File lib/reliable/queue.rb, line 96
def total_items
  @redis.scan("reliable:items:*").length
end
total_processing() click to toggle source
# File lib/reliable/queue.rb, line 84
def total_processing
  keys = @redis.scan "reliable:queues:*:workers:*:processing"

  lengths = @redis.pipeline do |pipe|
    keys.each do |key|
      pipe.llen key
    end
  end

  lengths.map(&:to_i).reduce(0, &:+)
end