class Procrastinator::Scheduler

A Scheduler object provides the API for client applications to manage delayed tasks.

Use delay to schedule new tasks, reschedule to alter existing tasks, and cancel to remove unwanted tasks.

@author Robin Miller

Public Class Methods

new(config, queue_manager) click to toggle source
# File lib/procrastinator/scheduler.rb, line 14
def initialize(config, queue_manager)
   @config        = config
   @queue_manager = queue_manager
end

Public Instance Methods

cancel(queue, identifier) click to toggle source

Removes an existing task, as located by the givne identifying information.

The identifier can include any data field stored in the task loader. Often this is the information in :data.

@param queue [Symbol] the symbol identifier for the queue to add a new task on @param identifier [Hash] Some identifying information to find the appropriate task.

@see TaskMetaData

# File lib/procrastinator/scheduler.rb, line 65
def cancel(queue, identifier)
   tasks = loader.read(identifier.merge(queue: queue.to_s))

   raise "no task matches search: #{ identifier }" if tasks.empty?
   raise "multiple tasks match search: #{ identifier }" if tasks.size > 1

   loader.delete(tasks.first[:id])
end
delay(queue = nil, data: nil, run_at: Time.now.to_i, expire_at: nil) click to toggle source

Records a new task to be executed at the given time.

@param queue [Symbol] the symbol identifier for the queue to add a new task on @param run_at [Time, Integer] Optional time when this task should be executed. Defaults to the current time. @param data [Hash, Array] Optional simple data object to be provided to the task upon execution. @param expire_at [Time, Integer] Optional time when the task should be abandoned

# File lib/procrastinator/scheduler.rb, line 25
def delay(queue = nil, data: nil, run_at: Time.now.to_i, expire_at: nil)
   verify_queue_arg!(queue)

   queue = @config.queue.name if @config.single_queue?

   verify_queue_data!(queue, data)

   loader.create(queue:          queue.to_s,
                 run_at:         run_at.to_i,
                 initial_run_at: run_at.to_i,
                 expire_at:      expire_at.nil? ? nil : expire_at.to_i,
                 data:           YAML.dump(data))
end
reschedule(queue, identifier) click to toggle source

Alters an existing task to run at a new time, expire at a new time, or both.

Call to on the result and pass in the new :run_at and/or :expire_at.

Example:

scheduler.reschedule(:alerts, data: {user_id: 5}).to(run_at: Time.now, expire_at: Time.now + 10)

The identifier can include any data field stored in the task loader. Often this is the information in :data.

@param queue [Symbol] the symbol identifier for the queue to add a new task on @param identifier [Hash] Some identifying information to find the appropriate task.

@see TaskMetaData

# File lib/procrastinator/scheduler.rb, line 53
def reschedule(queue, identifier)
   UpdateProxy.new(@config, identifier: identifier.merge(queue: queue.to_s))
end

Private Instance Methods

loader() click to toggle source

Scheduler must always get the loader indirectly. If it saves the loader to an instance variable, then that could hold a reference to a bad (ie. gone) connection on the previous process

# File lib/procrastinator/scheduler.rb, line 137
def loader
   @config.loader
end
verify_queue_arg!(queue_name) click to toggle source
# File lib/procrastinator/scheduler.rb, line 141
      def verify_queue_arg!(queue_name)
         raise ArgumentError, <<~ERR if !queue_name.nil? && !queue_name.is_a?(Symbol)
            must provide a queue name as the first argument. Received: #{ queue_name }
         ERR

         raise ArgumentError, <<~ERR if queue_name.nil? && !@config.single_queue?
            queue must be specified when more than one is registered. Defined queues are: #{ @config.queues_string }
         ERR
      end
verify_queue_data!(queue_name, data) click to toggle source
# File lib/procrastinator/scheduler.rb, line 151
      def verify_queue_data!(queue_name, data)
         queue = @config.queue(name: queue_name)

         unless queue
            queue_list = @config.queues_string
            raise ArgumentError, "there is no :#{ queue_name } queue registered. Defined queues are: #{ queue_list }"
         end

         if data.nil?
            if queue.task_class.method_defined?(:data=)
               raise ArgumentError, "task #{ queue.task_class } expects to receive :data. Provide :data to #delay."
            end
         elsif !queue.task_class.method_defined?(:data=)
            raise ArgumentError, <<~ERROR
               task #{ queue.task_class } does not import :data. Add this in your class definition:
                     task_attr :data
            ERROR
         end
      end