module ResqueAdmin::Scheduler::DelayingExtensions
Public Instance Methods
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 225 def count_all_scheduled_jobs total_jobs = 0 Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts| total_jobs += redis.llen("delayed:#{ts}").to_i end total_jobs end
Discover if a job has been delayed. Examples
ResqueAdmin.delayed?(MyJob) ResqueAdmin.delayed?(MyJob, id: 1)
Returns true if the job has been delayed
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 238 def delayed?(klass, *args) !scheduled_at(klass, *args).empty? end
Used internally to stuff the item into the schedule sorted list. timestamp
can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it's the first job to be scheduled at that time, else false
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 64 def delayed_push(timestamp, item) # First add this item to the list for this timestamp redis.rpush("delayed:#{timestamp.to_i}", encode(item)) # Store the timestamps at with this item occurs redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i end
Returns an array of timestamps based on start and count
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 78 def delayed_queue_peek(start, count) result = redis.zrange(:delayed_queue_schedule, start, start + count - 1) Array(result).map(&:to_i) end
Returns the size of the delayed queue schedule
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 85 def delayed_queue_schedule_size redis.zcard :delayed_queue_schedule end
Returns an array of delayed items for the given timestamp
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 96 def delayed_timestamp_peek(timestamp, start, count) if 1 == count r = list_range "delayed:#{timestamp.to_i}", start, count r.nil? ? [] : [r] else list_range "delayed:#{timestamp.to_i}", start, count end end
Returns the number of jobs for a given timestamp in the delayed queue schedule
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 91 def delayed_timestamp_size(timestamp) redis.llen("delayed:#{timestamp.to_i}").to_i end
This method is nearly identical to enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 13 def enqueue_at(timestamp, klass, *args) validate(klass) enqueue_at_with_queue( queue_from_class(klass), timestamp, klass, *args ) end
Identical to enqueue_at
, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects ResqueAdmin.inline option, by creating the job right away instead of adding to the queue.
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 24 def enqueue_at_with_queue(queue, timestamp, klass, *args) return false unless plugin.run_before_schedule_hooks(klass, *args) if ResqueAdmin.inline? || timestamp.to_i < Time.now.to_i # Just create the job and let resque_admin perform it right away with # inline. If the class is a custom job class, call self#scheduled # on it. This allows you to do things like # ResqueAdmin.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). # Otherwise, pass off to ResqueAdmin. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass.to_s, *args) else ResqueAdmin::Job.create(queue, klass, *args) end else delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) end plugin.run_after_schedule_hooks(klass, *args) end
Given an encoded item, enqueue it now
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 147 def enqueue_delayed(klass, *args) hash = job_to_hash(klass, args) remove_delayed(klass, *args).times do ResqueAdmin::Scheduler.enqueue_from_config(hash) end end
Given a block, enqueue jobs now that return true from a block
This allows for enqueuing of delayed jobs that have arguments matching certain criteria
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 171 def enqueue_delayed_selection(klass = nil) raise ArgumentError, 'Please supply a block' unless block_given? found_jobs = find_delayed_selection(klass) { |args| yield(args) } found_jobs.reduce(0) do |sum, encoded_job| decoded_job = decode(encoded_job) klass = Util.constantize(decoded_job['class']) sum + enqueue_delayed(klass, *decoded_job['args']) end end
Identical to enqueue_at
but takes number_of_seconds_from_now instead of a timestamp.
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 47 def enqueue_in(number_of_seconds_from_now, klass, *args) enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) end
Identical to enqueue_in
, except you can also specify a queue in which the job will be placed after the number of seconds has passed.
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 54 def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args) end
Given a block, find jobs that return true from a block
This allows for finding of delayed jobs that have arguments matching certain criteria
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 186 def find_delayed_selection(klass = nil, &block) raise ArgumentError, 'Please supply a block' unless block_given? timestamps = redis.zrange(:delayed_queue_schedule, 0, -1) # Beyond 100 there's almost no improvement in speed found = timestamps.each_slice(100).map do |ts_group| jobs = redis.pipelined do |r| ts_group.each do |ts| r.lrange("delayed:#{ts}", 0, -1) end end jobs.flatten.select do |payload| payload_matches_selection?(decode(payload), klass, &block) end end found.flatten end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 254 def get_last_enqueued_at(job_name) redis.hget('delayed:last_enqueued_at', job_name) end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 250 def last_enqueued_at(job_name, date) redis.hset('delayed:last_enqueued_at', job_name, date) end
Returns the next delayed queue timestamp (don't call directly)
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 107 def next_delayed_timestamp(at_time = nil) search_first_delayed_timestamp_in_range(nil, at_time || Time.now) end
Returns the next item to be processed for a given timestamp, nil if done. (don't call directly) timestamp
can either be in seconds or a datetime
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 114 def next_item_for_timestamp(timestamp) key = "delayed:#{timestamp.to_i}" encoded_item = redis.lpop(key) redis.srem("timestamps:#{encoded_item}", key) item = decode(encoded_item) # If the list is empty, remove it. clean_up_timestamp(key, timestamp) item end
Given an encoded item, remove it from the delayed_queue
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 141 def remove_delayed(klass, *args) search = encode(job_to_hash(klass, args)) remove_delayed_job(search) end
Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.
O(N) where N is the number of jobs scheduled to fire at the given timestamp
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 212 def remove_delayed_job_from_timestamp(timestamp, klass, *args) return 0 if ResqueAdmin.inline? key = "delayed:#{timestamp.to_i}" encoded_job = encode(job_to_hash(klass, args)) redis.srem("timestamps:#{encoded_job}", key) count = redis.lrem(key, 0, encoded_job) clean_up_timestamp(key, timestamp) count end
Given a block, remove jobs that return true from a block
This allows for removal of delayed jobs that have arguments matching certain criteria
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 158 def remove_delayed_selection(klass = nil) raise ArgumentError, 'Please supply a block' unless block_given? found_jobs = find_delayed_selection(klass) { |args| yield(args) } found_jobs.reduce(0) do |sum, encoded_job| sum + remove_delayed_job(encoded_job) end end
Clears all jobs created with enqueue_at
or enqueue_in
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 127 def reset_delayed_queue Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| key = "delayed:#{item}" items = redis.lrange(key, 0, -1) redis.pipelined do items.each { |ts_item| redis.del("timestamps:#{ts_item}") } end redis.del key end redis.del :delayed_queue_schedule end
Returns delayed jobs schedule timestamp for klass
, args
.
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 243 def scheduled_at(klass, *args) search = encode(job_to_hash(klass, args)) redis.smembers("timestamps:#{search}").map do |key| key.tr('delayed:', '').to_i end end
Private Instance Methods
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 284 def clean_up_timestamp(key, timestamp) # Use a watch here to ensure nobody adds jobs to this delayed # queue while we're removing it. redis.watch(key) do if redis.llen(key).to_i == 0 # If the list is empty, remove it. redis.multi do redis.del(key) redis.zrem(:delayed_queue_schedule, timestamp.to_i) end else redis.redis.unwatch end end end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 260 def job_to_hash(klass, args) { class: klass.to_s, args: args, queue: queue_from_class(klass) } end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 264 def job_to_hash_with_queue(queue, klass, args) { class: klass.to_s, args: args, queue: queue } end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 312 def payload_matches_selection?(decoded_payload, klass) return false if decoded_payload.nil? job_class = decoded_payload['class'] relevant_class = (klass.nil? || klass.to_s == job_class) relevant_class && yield(decoded_payload['args']) end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 319 def plugin ResqueAdmin::Scheduler::Plugin end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 268 def remove_delayed_job(encoded_job) return 0 if ResqueAdmin.inline? timestamps = redis.smembers("timestamps:#{encoded_job}") replies = redis.pipelined do timestamps.each do |key| redis.lrem(key, 0, encoded_job) redis.srem("timestamps:#{encoded_job}", key) end end return 0 if replies.nil? || replies.empty? replies.each_slice(2).map(&:first).inject(:+) end
# File lib/resque_admin/scheduler/delaying_extensions.rb, line 300 def search_first_delayed_timestamp_in_range(start_at, stop_at) start_at = start_at.nil? ? '-inf' : start_at.to_i stop_at = stop_at.nil? ? '+inf' : stop_at.to_i items = redis.zrangebyscore( :delayed_queue_schedule, start_at, stop_at, limit: [0, 1] ) timestamp = items.nil? ? nil : Array(items).first timestamp.to_i unless timestamp.nil? end