class RedisScheduler

A simple, production-ready chronological scheduler for Redis.

Use schedule! to add an item to be processed at an arbitrary point in time. The item will be converted to a string and returned to you as such.

Use each to iterate over those items at the scheduled time. This call iterates over all items that are scheduled on or before the current time, in chronological order. In blocking mode, this call will wait forever until such items become available, and will never terminate. In non-blocking mode, this call will only iterate over ready items and will terminate when there are no items ready for processing.

Use items to simply iterate over all items in the queue, for debugging purposes.

Exceptions during processing

Any exceptions during each will result in the item being re-added to the schedule at the original time.

Multiple producers and consumers

Multiple producers and consumers are fine.

Concurrent reads and writes

Concurrent reads and writes are fine.

Segfaults

The scheduler maintains a “processing set” of items currently being processed. If a process dies (i.e. not as a result of a Ruby exception, but as the result of a segfault), the item will remain in this set but will not longer appear in the schedule. To avoid losing scheduled work due to segfaults, you must periodically iterate through this set and recover any items that have been abandoned, using processing_set_items. Setting a proper ‘descriptor’ argument in each is suggested.

Constants

CAS_DELAY
POLL_DELAY

Attributes

blocking[R]
namespace[R]
queue[R]
redis[R]
uniq[R]

Public Class Methods

new(redis, opts={}) click to toggle source

Options:

  • namespace: prefix for Redis keys, e.g. “scheduler/”.

  • blocking: whether each should block or return immediately if there are items to be processed immediately.

  • uniq: when false (default), the same item can be scheduled for multiple times. When true, scheduling the same item multiple times only updates its scheduled time, and does not represent the item multiple times in the schedule.

Note that uniq is set on a per-schedule basis and cannot be changed. Once a uniq schedule is created, it is forever uniq (until reset! is called, at least). Attempts to use non-uniq queues in a uniq manner, or vice versa, will result in undefined behavior (probably errors).

Note also that nonblocking mode may still actually block momentarily as part of the check-and-set semantics, i.e. block during contention from multiple clients. “Nonblocking” refers to whether the scheduler should wait until events in the schedule are ready, or only return those items that are ready currently.

# File lib/redis-scheduler.rb, line 59
def initialize redis, opts={}
  @redis = redis
  @namespace = opts[:namespace]
  @blocking = opts[:blocking]
  @uniq = opts[:uniq]

  @queue = [@namespace, "q"].join
  @processing_set = [@namespace, "processing"].join
  @counter = [@namespace, "counter"].join
end

Public Instance Methods

each(descriptor=nil) { |item, at| ... } click to toggle source

Yields items along with their scheduled times. Only returns items on or after their scheduled times. Items are returned as strings. If @blocking is false, will stop once there are no more items that can be processed immediately; if it’s true, will wait until items become available (and never terminate).

Descriptor is an optional string that will be associated with this item while in the processing set. This is useful for providing whatever information you need to determine whether the item needs to be recovered when iterating through the processing set.

# File lib/redis-scheduler.rb, line 99
def each descriptor=nil
  while(x = get(descriptor))
    item, processing_descriptor, at = x
    begin
      yield item, at
    rescue Exception # back in the hole!
      schedule! item, at
      raise
    ensure
      cleanup! processing_descriptor
    end
  end
end
items() click to toggle source

Returns an Enumerable of [item, scheduled time] pairs, which can be used to iterate over all the items in the queue, in order of earliest- to latest-scheduled, regardless of the schedule time.

Note that this view is not synchronized with write operations, and thus may be inconsistent (e.g. return duplicates, miss items, etc) if changes to the schedule happen while iterating.

For these reasons, this is mainly useful for debugging purposes.

# File lib/redis-scheduler.rb, line 122
def items; ItemEnumerator.new(self) end
parse_entry(entry) click to toggle source

the inverse of make_entry below. public because it must also be called by ItemEnumerator.

# File lib/redis-scheduler.rb, line 136
def parse_entry entry
  item = if @uniq
    entry
  else
    delim = entry.index ":"
    entry[(delim + 1) .. -1]
  end
  begin
    Marshal.load item
  rescue TypeError
    ## fall back to treating the item as a string--it's possible the schedule contains
    ## items inserted from a pre-0.9 release, in which case they'll be stored
    ## as pure strings.
    item
  end
end
processing_set_items() click to toggle source

Returns an Array of [item, timestamp, descriptor] tuples representing the set of in-process items. The timestamp corresponds to the time at which the item was removed from the schedule for processing.

# File lib/redis-scheduler.rb, line 127
def processing_set_items
  @redis.smembers(@processing_set).map do |x|
    item, timestamp, descriptor = Marshal.load(x)
    [item, Time.at(timestamp), descriptor]
  end
end
processing_set_size() click to toggle source

Returns the total number of items currently being processed.

# File lib/redis-scheduler.rb, line 87
def processing_set_size; @redis.scard @processing_set end
reset!() click to toggle source

Drop all data and reset the schedule entirely.

# File lib/redis-scheduler.rb, line 79
def reset!
  [@queue, @processing_set, @counter].each { |k| @redis.del k }
end
schedule!(item, time) click to toggle source

Schedule an item at a specific time. Item is any Ruby object that can be marshalled.

# File lib/redis-scheduler.rb, line 74
def schedule! item, time
  @redis.zadd @queue, time.to_f, make_entry(item)
end
size() click to toggle source

Return the total number of items in the schedule.

# File lib/redis-scheduler.rb, line 84
def size; @redis.zcard @queue end

Private Instance Methods

blocking_get(descriptor) click to toggle source
# File lib/redis-scheduler.rb, line 168
def blocking_get descriptor
  sleep POLL_DELAY until(x = nonblocking_get(descriptor))
  x
end
cleanup!(item) click to toggle source
# File lib/redis-scheduler.rb, line 194
def cleanup! item
  @redis.srem @processing_set, item
end
get(descriptor;) click to toggle source
# File lib/redis-scheduler.rb, line 166
def get descriptor; @blocking ? blocking_get(descriptor) : nonblocking_get(descriptor) end
make_entry(item) click to toggle source

generate the value actually stored in redis

# File lib/redis-scheduler.rb, line 156
def make_entry item
  item = Marshal.dump item
  if @uniq
    item
  else
    id = @redis.incr @counter
    "#{id}:#{item}"
  end
end
nonblocking_get(descriptor) click to toggle source
# File lib/redis-scheduler.rb, line 178
def nonblocking_get descriptor
  loop do
    @redis.watch @queue
    entries = @redis.zrangebyscore @queue, 0, Time.now.to_f, :withscores => true, :limit => [0, 1]
    break unless entries.size > 0
    entry, at = entries.first
    item = parse_entry entry
    descriptor = Marshal.dump [item, Time.now.to_i, descriptor]
    @redis.multi do # try and grab it
      @redis.zrem @queue, entry
      @redis.sadd @processing_set, descriptor
    end and break [item, descriptor, Time.at(at.to_f)]
    sleep CAS_DELAY # transaction failed. retry!
  end
end