class Roundhouse::Scheduled::Enq

Public Instance Methods

enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS) click to toggle source
# File lib/roundhouse/scheduled.rb, line 11
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Roundhouse.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get the next item in the queue if it's score (time to execute) is <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

        # Pop item off the queue and add it to the work queue. If the job can't be popped from
        # the queue, it's because another process already popped it so we can move on to the
        # next one.
        if conn.zrem(sorted_set, job)
          Roundhouse::Client.push(Roundhouse.load_json(job))
          Roundhouse::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
        end
      end
    end
  end
end