class RedisScheduler
A simple, production-ready chronological scheduler for Redis.
Use schedule!
to add an item to be processed at an arbitrary point in time. The item will be converted to a string and returned to you as such.
Use each
to iterate over those items at the scheduled time. This call iterates over all items that are scheduled on or before the current time, in chronological order. In blocking mode, this call will wait forever until such items become available, and will never terminate. In non-blocking mode, this call will only iterate over ready items and will terminate when there are no items ready for processing.
Use items
to simply iterate over all items in the queue, for debugging purposes.
Exceptions during processing¶ ↑
Any exceptions during each
will result in the item being re-added to the schedule at the original time.
Multiple producers and consumers¶ ↑
Multiple producers and consumers are fine.
Concurrent reads and writes¶ ↑
Concurrent reads and writes are fine.
Segfaults¶ ↑
The scheduler maintains a “processing set” of items currently being processed. If a process dies (i.e. not as a result of a Ruby exception, but as the result of a segfault), the item will remain in this set but will not longer appear in the schedule. To avoid losing scheduled work due to segfaults, you must periodically iterate through this set and recover any items that have been abandoned, using processing_set_items
. Setting a proper ‘descriptor’ argument in each
is suggested.
Constants
- CAS_DELAY
- POLL_DELAY
Attributes
Public Class Methods
Options:
-
namespace
: prefix for Redis keys, e.g. “scheduler/”. -
blocking
: whethereach
should block or return immediately if there are items to be processed immediately. -
uniq
: when false (default), the same item can be scheduled for multiple times. When true, scheduling the same item multiple times only updates its scheduled time, and does not represent the item multiple times in the schedule.
Note that uniq is set on a per-schedule basis and cannot be changed. Once a uniq schedule is created, it is forever uniq (until reset!
is called, at least). Attempts to use non-uniq queues in a uniq manner, or vice versa, will result in undefined behavior (probably errors).
Note also that nonblocking mode may still actually block momentarily as part of the check-and-set semantics, i.e. block during contention from multiple clients. “Nonblocking” refers to whether the scheduler should wait until events in the schedule are ready, or only return those items that are ready currently.
# File lib/redis-scheduler.rb, line 59 def initialize redis, opts={} @redis = redis @namespace = opts[:namespace] @blocking = opts[:blocking] @uniq = opts[:uniq] @queue = [@namespace, "q"].join @processing_set = [@namespace, "processing"].join @counter = [@namespace, "counter"].join end
Public Instance Methods
Yields items along with their scheduled times. Only returns items on or after their scheduled times. Items are returned as strings. If @blocking is false, will stop once there are no more items that can be processed immediately; if it’s true, will wait until items become available (and never terminate).
Descriptor
is an optional string that will be associated with this item while in the processing set. This is useful for providing whatever information you need to determine whether the item needs to be recovered when iterating through the processing set.
# File lib/redis-scheduler.rb, line 99 def each descriptor=nil while(x = get(descriptor)) item, processing_descriptor, at = x begin yield item, at rescue Exception # back in the hole! schedule! item, at raise ensure cleanup! processing_descriptor end end end
Returns an Enumerable of [item, scheduled time] pairs, which can be used to iterate over all the items in the queue, in order of earliest- to latest-scheduled, regardless of the schedule time.
Note that this view is not synchronized with write operations, and thus may be inconsistent (e.g. return duplicates, miss items, etc) if changes to the schedule happen while iterating.
For these reasons, this is mainly useful for debugging purposes.
# File lib/redis-scheduler.rb, line 122 def items; ItemEnumerator.new(self) end
the inverse of make_entry
below. public because it must also be called by ItemEnumerator
.
# File lib/redis-scheduler.rb, line 136 def parse_entry entry item = if @uniq entry else delim = entry.index ":" entry[(delim + 1) .. -1] end begin Marshal.load item rescue TypeError ## fall back to treating the item as a string--it's possible the schedule contains ## items inserted from a pre-0.9 release, in which case they'll be stored ## as pure strings. item end end
Returns an Array of [item, timestamp, descriptor] tuples representing the set of in-process items. The timestamp corresponds to the time at which the item was removed from the schedule for processing.
# File lib/redis-scheduler.rb, line 127 def processing_set_items @redis.smembers(@processing_set).map do |x| item, timestamp, descriptor = Marshal.load(x) [item, Time.at(timestamp), descriptor] end end
Returns the total number of items currently being processed.
# File lib/redis-scheduler.rb, line 87 def processing_set_size; @redis.scard @processing_set end
Drop all data and reset the schedule entirely.
# File lib/redis-scheduler.rb, line 79 def reset! [@queue, @processing_set, @counter].each { |k| @redis.del k } end
Schedule an item at a specific time. Item is any Ruby object that can be marshalled.
# File lib/redis-scheduler.rb, line 74 def schedule! item, time @redis.zadd @queue, time.to_f, make_entry(item) end
Return the total number of items in the schedule.
# File lib/redis-scheduler.rb, line 84 def size; @redis.zcard @queue end
Private Instance Methods
# File lib/redis-scheduler.rb, line 168 def blocking_get descriptor sleep POLL_DELAY until(x = nonblocking_get(descriptor)) x end
# File lib/redis-scheduler.rb, line 194 def cleanup! item @redis.srem @processing_set, item end
# File lib/redis-scheduler.rb, line 166 def get descriptor; @blocking ? blocking_get(descriptor) : nonblocking_get(descriptor) end
generate the value actually stored in redis
# File lib/redis-scheduler.rb, line 156 def make_entry item item = Marshal.dump item if @uniq item else id = @redis.incr @counter "#{id}:#{item}" end end
# File lib/redis-scheduler.rb, line 178 def nonblocking_get descriptor loop do @redis.watch @queue entries = @redis.zrangebyscore @queue, 0, Time.now.to_f, :withscores => true, :limit => [0, 1] break unless entries.size > 0 entry, at = entries.first item = parse_entry entry descriptor = Marshal.dump [item, Time.now.to_i, descriptor] @redis.multi do # try and grab it @redis.zrem @queue, entry @redis.sadd @processing_set, descriptor end and break [item, descriptor, Time.at(at.to_f)] sleep CAS_DELAY # transaction failed. retry! end end