class Procrastinator::Scheduler
A Scheduler
object provides the API for client applications to manage delayed tasks.
Use delay
to schedule new tasks, reschedule
to alter existing tasks, and cancel
to remove unwanted tasks.
@author Robin Miller
Public Class Methods
# File lib/procrastinator/scheduler.rb, line 14 def initialize(config, queue_manager) @config = config @queue_manager = queue_manager end
Public Instance Methods
Removes an existing task, as located by the givne identifying information.
The identifier can include any data field stored in the task loader. Often this is the information in :data.
@param queue [Symbol] the symbol identifier for the queue to add a new task on @param identifier [Hash] Some identifying information to find the appropriate task.
@see TaskMetaData
# File lib/procrastinator/scheduler.rb, line 65 def cancel(queue, identifier) tasks = loader.read(identifier.merge(queue: queue.to_s)) raise "no task matches search: #{ identifier }" if tasks.empty? raise "multiple tasks match search: #{ identifier }" if tasks.size > 1 loader.delete(tasks.first[:id]) end
Records a new task to be executed at the given time.
@param queue [Symbol] the symbol identifier for the queue to add a new task on @param run_at [Time, Integer] Optional time when this task should be executed. Defaults to the current time. @param data [Hash, Array] Optional simple data object to be provided to the task upon execution. @param expire_at [Time, Integer] Optional time when the task should be abandoned
# File lib/procrastinator/scheduler.rb, line 25 def delay(queue = nil, data: nil, run_at: Time.now.to_i, expire_at: nil) verify_queue_arg!(queue) queue = @config.queue.name if @config.single_queue? verify_queue_data!(queue, data) loader.create(queue: queue.to_s, run_at: run_at.to_i, initial_run_at: run_at.to_i, expire_at: expire_at.nil? ? nil : expire_at.to_i, data: YAML.dump(data)) end
Alters an existing task to run at a new time, expire at a new time, or both.
Call to on the result and pass in the new :run_at and/or :expire_at.
Example:
scheduler.reschedule(:alerts, data: {user_id: 5}).to(run_at: Time.now, expire_at: Time.now + 10)
The identifier can include any data field stored in the task loader. Often this is the information in :data.
@param queue [Symbol] the symbol identifier for the queue to add a new task on @param identifier [Hash] Some identifying information to find the appropriate task.
@see TaskMetaData
# File lib/procrastinator/scheduler.rb, line 53 def reschedule(queue, identifier) UpdateProxy.new(@config, identifier: identifier.merge(queue: queue.to_s)) end
Private Instance Methods
Scheduler
must always get the loader indirectly. If it saves the loader to an instance variable, then that could hold a reference to a bad (ie. gone) connection on the previous process
# File lib/procrastinator/scheduler.rb, line 137 def loader @config.loader end
# File lib/procrastinator/scheduler.rb, line 141 def verify_queue_arg!(queue_name) raise ArgumentError, <<~ERR if !queue_name.nil? && !queue_name.is_a?(Symbol) must provide a queue name as the first argument. Received: #{ queue_name } ERR raise ArgumentError, <<~ERR if queue_name.nil? && !@config.single_queue? queue must be specified when more than one is registered. Defined queues are: #{ @config.queues_string } ERR end
# File lib/procrastinator/scheduler.rb, line 151 def verify_queue_data!(queue_name, data) queue = @config.queue(name: queue_name) unless queue queue_list = @config.queues_string raise ArgumentError, "there is no :#{ queue_name } queue registered. Defined queues are: #{ queue_list }" end if data.nil? if queue.task_class.method_defined?(:data=) raise ArgumentError, "task #{ queue.task_class } expects to receive :data. Provide :data to #delay." end elsif !queue.task_class.method_defined?(:data=) raise ArgumentError, <<~ERROR task #{ queue.task_class } does not import :data. Add this in your class definition: task_attr :data ERROR end end