class PushmiPullyu::PreservationQueue
1) Create a sorted set in Redis (redis.io/topics/data-types). Call it preservation_queue
2) In GenericFile add an after_save that:
- determines a monotonically increasing "score". Obvious scores would be either the time in seconds/milliseconds or using something like redis INCR to create an atomic, increasing counter. It doesn't matter if 2 different noids ever have the same score, it only that scores generally increase over time. - zadd preservation_queue score "noid" adds the noid and gives it the score from above.
3) Pushmi-pullyu pops elements out of the sorted set, lowest score to highest.
A sorted set will only ever contain a noid once, with whatever score it was last given. Because preservation_queue is sorted lowest score to highest, and because scores increase over time, a cascade of jobs/updates will cause a noid to keep “moving back” in the queue until it becomes the least recently updated noid in the queue, at which point it will be popped and preserved. Any further updates will trigger a new AIP build.
Public Class Methods
new(redis_url: 'redis://localhost:6379', pool_opts: { size: 1, timeout: 5 }, poll_interval: 10, age_at_least: 0, queue_name: 'dev:pmpy_queue')
click to toggle source
# File lib/pushmi_pullyu/preservation_queue.rb, line 24 def initialize(redis_url: 'redis://localhost:6379', pool_opts: { size: 1, timeout: 5 }, poll_interval: 10, age_at_least: 0, queue_name: 'dev:pmpy_queue') # we use a connection pool even though we're not (currently) threading # as it transparently provides for repairing connections if they are closed after long periods of inactivity @redis = ConnectionPool.new(pool_opts) do Redis.new(url: redis_url) end raise ConnectionError unless connected? @poll_interval = poll_interval @age_at_least = age_at_least @queue_name = queue_name end
Public Instance Methods
next_item()
click to toggle source
# File lib/pushmi_pullyu/preservation_queue.rb, line 42 def next_item raise ConnectionError unless connected? @redis.with do |conn| conn.watch(@queue_name) do |rd| # transactional mutation of the set is dependent on the set key element, score = rd.zrange(@queue_name, 0, 0, with_scores: true).first if element && ((Time.now.to_f - @age_at_least) >= score) rd.multi do |tx| tx.zrem(@queue_name, element) # remove the top element transactionally end return element else rd.unwatch # cancel the transaction since there was nothing in the queue return nil end end end end
wait_next_item()
click to toggle source
# File lib/pushmi_pullyu/preservation_queue.rb, line 62 def wait_next_item while PushmiPullyu.continue_polling? element = next_item return element if element.present? sleep @poll_interval end end
Protected Instance Methods
connected?()
click to toggle source
# File lib/pushmi_pullyu/preservation_queue.rb, line 73 def connected? @redis.with do |conn| conn.ping == 'PONG' end end