module ResqueAdmin::Scheduler::DelayingExtensions

Public Instance Methods

count_all_scheduled_jobs() click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 225
def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts|
    total_jobs += redis.llen("delayed:#{ts}").to_i
  end
  total_jobs
end
delayed?(klass, *args) click to toggle source

Discover if a job has been delayed. Examples

ResqueAdmin.delayed?(MyJob)
ResqueAdmin.delayed?(MyJob, id: 1)

Returns true if the job has been delayed

# File lib/resque/scheduler/delaying_extensions.rb, line 238
def delayed?(klass, *args)
  !scheduled_at(klass, *args).empty?
end
delayed_push(timestamp, item) click to toggle source

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it's the first job to be scheduled at that time, else false

# File lib/resque/scheduler/delaying_extensions.rb, line 64
def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Store the timestamps at with this item occurs
  redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end
delayed_queue_peek(start, count) click to toggle source

Returns an array of timestamps based on start and count

# File lib/resque/scheduler/delaying_extensions.rb, line 78
def delayed_queue_peek(start, count)
  result = redis.zrange(:delayed_queue_schedule, start,
                        start + count - 1)
  Array(result).map(&:to_i)
end
delayed_queue_schedule_size() click to toggle source

Returns the size of the delayed queue schedule

# File lib/resque/scheduler/delaying_extensions.rb, line 85
def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end
delayed_timestamp_peek(timestamp, start, count) click to toggle source

Returns an array of delayed items for the given timestamp

# File lib/resque/scheduler/delaying_extensions.rb, line 96
def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end
delayed_timestamp_size(timestamp) click to toggle source

Returns the number of jobs for a given timestamp in the delayed queue schedule

# File lib/resque/scheduler/delaying_extensions.rb, line 91
def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end
enqueue_at(timestamp, klass, *args) click to toggle source

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.

# File lib/resque/scheduler/delaying_extensions.rb, line 13
def enqueue_at(timestamp, klass, *args)
  validate(klass)
  enqueue_at_with_queue(
    queue_from_class(klass), timestamp, klass, *args
  )
end
enqueue_at_with_queue(queue, timestamp, klass, *args) click to toggle source

Identical to enqueue_at, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects ResqueAdmin.inline option, by creating the job right away instead of adding to the queue.

# File lib/resque/scheduler/delaying_extensions.rb, line 24
def enqueue_at_with_queue(queue, timestamp, klass, *args)
  return false unless plugin.run_before_schedule_hooks(klass, *args)

  if ResqueAdmin.inline? || timestamp.to_i < Time.now.to_i
    # Just create the job and let resque_admin perform it right away with
    # inline.  If the class is a custom job class, call self#scheduled
    # on it. This allows you to do things like
    # ResqueAdmin.enqueue_at(timestamp, CustomJobClass, :opt1 => val1).
    # Otherwise, pass off to ResqueAdmin.
    if klass.respond_to?(:scheduled)
      klass.scheduled(queue, klass.to_s, *args)
    else
      ResqueAdmin::Job.create(queue, klass, *args)
    end
  else
    delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
  end

  plugin.run_after_schedule_hooks(klass, *args)
end
enqueue_delayed(klass, *args) click to toggle source

Given an encoded item, enqueue it now

# File lib/resque/scheduler/delaying_extensions.rb, line 147
def enqueue_delayed(klass, *args)
  hash = job_to_hash(klass, args)
  remove_delayed(klass, *args).times do
    ResqueAdmin::Scheduler.enqueue_from_config(hash)
  end
end
enqueue_delayed_selection(klass = nil) { |args| ... } click to toggle source

Given a block, enqueue jobs now that return true from a block

This allows for enqueuing of delayed jobs that have arguments matching certain criteria

# File lib/resque/scheduler/delaying_extensions.rb, line 171
def enqueue_delayed_selection(klass = nil)
  raise ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = find_delayed_selection(klass) { |args| yield(args) }
  found_jobs.reduce(0) do |sum, encoded_job|
    decoded_job = decode(encoded_job)
    klass = Util.constantize(decoded_job['class'])
    sum + enqueue_delayed(klass, *decoded_job['args'])
  end
end
enqueue_in(number_of_seconds_from_now, klass, *args) click to toggle source

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.

# File lib/resque/scheduler/delaying_extensions.rb, line 47
def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end
enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) click to toggle source

Identical to enqueue_in, except you can also specify a queue in which the job will be placed after the number of seconds has passed.

# File lib/resque/scheduler/delaying_extensions.rb, line 54
def enqueue_in_with_queue(queue, number_of_seconds_from_now,
                          klass, *args)
  enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
                        klass, *args)
end
find_delayed_selection(klass = nil, &block) click to toggle source

Given a block, find jobs that return true from a block

This allows for finding of delayed jobs that have arguments matching certain criteria

# File lib/resque/scheduler/delaying_extensions.rb, line 186
def find_delayed_selection(klass = nil, &block)
  raise ArgumentError, 'Please supply a block' unless block_given?

  timestamps = redis.zrange(:delayed_queue_schedule, 0, -1)

  # Beyond 100 there's almost no improvement in speed
  found = timestamps.each_slice(100).map do |ts_group|
    jobs = redis.pipelined do |r|
      ts_group.each do |ts|
        r.lrange("delayed:#{ts}", 0, -1)
      end
    end

    jobs.flatten.select do |payload|
      payload_matches_selection?(decode(payload), klass, &block)
    end
  end

  found.flatten
end
get_last_enqueued_at(job_name) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 254
def get_last_enqueued_at(job_name)
  redis.hget('delayed:last_enqueued_at', job_name)
end
last_enqueued_at(job_name, date) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 250
def last_enqueued_at(job_name, date)
  redis.hset('delayed:last_enqueued_at', job_name, date)
end
next_delayed_timestamp(at_time = nil) click to toggle source

Returns the next delayed queue timestamp (don't call directly)

# File lib/resque/scheduler/delaying_extensions.rb, line 107
def next_delayed_timestamp(at_time = nil)
  search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end
next_item_for_timestamp(timestamp) click to toggle source

Returns the next item to be processed for a given timestamp, nil if done. (don't call directly) timestamp can either be in seconds or a datetime

# File lib/resque/scheduler/delaying_extensions.rb, line 114
def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  encoded_item = redis.lpop(key)
  redis.srem("timestamps:#{encoded_item}", key)
  item = decode(encoded_item)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end
remove_delayed(klass, *args) click to toggle source

Given an encoded item, remove it from the delayed_queue

# File lib/resque/scheduler/delaying_extensions.rb, line 141
def remove_delayed(klass, *args)
  search = encode(job_to_hash(klass, args))
  remove_delayed_job(search)
end
remove_delayed_job_from_timestamp(timestamp, klass, *args) click to toggle source

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp

# File lib/resque/scheduler/delaying_extensions.rb, line 212
def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  return 0 if ResqueAdmin.inline?

  key = "delayed:#{timestamp.to_i}"
  encoded_job = encode(job_to_hash(klass, args))

  redis.srem("timestamps:#{encoded_job}", key)
  count = redis.lrem(key, 0, encoded_job)
  clean_up_timestamp(key, timestamp)

  count
end
remove_delayed_selection(klass = nil) { |args| ... } click to toggle source

Given a block, remove jobs that return true from a block

This allows for removal of delayed jobs that have arguments matching certain criteria

# File lib/resque/scheduler/delaying_extensions.rb, line 158
def remove_delayed_selection(klass = nil)
  raise ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = find_delayed_selection(klass) { |args| yield(args) }
  found_jobs.reduce(0) do |sum, encoded_job|
    sum + remove_delayed_job(encoded_job)
  end
end
reset_delayed_queue() click to toggle source

Clears all jobs created with enqueue_at or enqueue_in

# File lib/resque/scheduler/delaying_extensions.rb, line 127
def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    key = "delayed:#{item}"
    items = redis.lrange(key, 0, -1)
    redis.pipelined do
      items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
    end
    redis.del key
  end

  redis.del :delayed_queue_schedule
end
scheduled_at(klass, *args) click to toggle source

Returns delayed jobs schedule timestamp for klass, args.

# File lib/resque/scheduler/delaying_extensions.rb, line 243
def scheduled_at(klass, *args)
  search = encode(job_to_hash(klass, args))
  redis.smembers("timestamps:#{search}").map do |key|
    key.tr('delayed:', '').to_i
  end
end

Private Instance Methods

clean_up_timestamp(key, timestamp) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 284
def clean_up_timestamp(key, timestamp)
  # Use a watch here to ensure nobody adds jobs to this delayed
  # queue while we're removing it.
  redis.watch(key) do
    if redis.llen(key).to_i == 0
      # If the list is empty, remove it.
      redis.multi do
        redis.del(key)
        redis.zrem(:delayed_queue_schedule, timestamp.to_i)
      end
    else
      redis.redis.unwatch
    end
  end
end
job_to_hash(klass, args) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 260
def job_to_hash(klass, args)
  { class: klass.to_s, args: args, queue: queue_from_class(klass) }
end
job_to_hash_with_queue(queue, klass, args) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 264
def job_to_hash_with_queue(queue, klass, args)
  { class: klass.to_s, args: args, queue: queue }
end
payload_matches_selection?(decoded_payload, klass) { |decoded_payload| ... } click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 312
def payload_matches_selection?(decoded_payload, klass)
  return false if decoded_payload.nil?
  job_class = decoded_payload['class']
  relevant_class = (klass.nil? || klass.to_s == job_class)
  relevant_class && yield(decoded_payload['args'])
end
plugin() click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 319
def plugin
  ResqueAdmin::Scheduler::Plugin
end
remove_delayed_job(encoded_job) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 268
def remove_delayed_job(encoded_job)
  return 0 if ResqueAdmin.inline?

  timestamps = redis.smembers("timestamps:#{encoded_job}")

  replies = redis.pipelined do
    timestamps.each do |key|
      redis.lrem(key, 0, encoded_job)
      redis.srem("timestamps:#{encoded_job}", key)
    end
  end

  return 0 if replies.nil? || replies.empty?
  replies.each_slice(2).map(&:first).inject(:+)
end
search_first_delayed_timestamp_in_range(start_at, stop_at) click to toggle source
# File lib/resque/scheduler/delaying_extensions.rb, line 300
def search_first_delayed_timestamp_in_range(start_at, stop_at)
  start_at = start_at.nil? ? '-inf' : start_at.to_i
  stop_at = stop_at.nil? ? '+inf' : stop_at.to_i

  items = redis.zrangebyscore(
    :delayed_queue_schedule, start_at, stop_at,
    limit: [0, 1]
  )
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end