class Procrastinator::Queue

A Queue defines how a certain type task will be processed.

@author Robin Miller

@!attribute [r] :name

@return [Symbol] The queue's identifier symbol

@!attribute [r] :task_class

@return [Class] Class that defines the work to be done for jobs in this queue.

@!attribute [r] :timeout

@return [Numeric] Duration (seconds) after which tasks in this queue should fail for taking too long.

@!attribute [r] :max_attempts

@return [Integer] Maximum number of attempts for tasks in this queue.

@!attribute [r] :update_period

@return [Numeric] Delay (seconds) between reloads of tasks from the task store.

Constants

DEFAULT_MAX_ATTEMPTS

Default number of times to retry a task

DEFAULT_TIMEOUT

Default number of seconds to wait for a task to complete

DEFAULT_UPDATE_PERIOD

Default amount of time between checks for new Tasks

Attributes

max_attempts[R]
name[R]
storage[R]
store[R]
task_class[R]
task_store[R]
timeout[R]
update_period[R]

Public Class Methods

new(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) click to toggle source

Timeout is in seconds

# File lib/procrastinator/queue.rb, line 38
def initialize(name:, task_class:,
               max_attempts: DEFAULT_MAX_ATTEMPTS,
               timeout: DEFAULT_TIMEOUT,
               update_period: DEFAULT_UPDATE_PERIOD,
               store: TaskStore::SimpleCommaStore.new)
   raise ArgumentError, ':name cannot be nil' unless name

   raise ArgumentError, ':task_class cannot be nil' unless task_class
   raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

   raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

   @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
   @task_class    = task_class
   @task_store    = store
   @max_attempts  = max_attempts
   @timeout       = timeout
   @update_period = update_period

   validate!

   freeze
end

Public Instance Methods

create(run_at:, expire_at:, data:) click to toggle source

Creates a task on the queue, saved using the Task Store strategy.

@param run_at [Time] Earliest time to attempt running the task @param expire_at [Time, nil] Time after which the task will not be attempted @param data [Hash, String, Numeric, nil] The data to save

@raise [ArgumentError] when the keyword ‘:data` is needed by the task handler, but is missing @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.

# File lib/procrastinator/queue.rb, line 108
      def create(run_at:, expire_at:, data:)
         if data.nil? && expects_data?
            raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
         end

         unless data.nil? || expects_data?
            raise MalformedTaskError, <<~ERROR
               found unexpected :data argument. Either do not provide :data when scheduling a task,
               or add this in the #{ @task_class } class definition:
                     attr_accessor :data
            ERROR
         end

         # TODO: shorten to using slice once updated to Ruby 2.5+
         attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

         create_data = TaskMetaData.new(**attrs).to_h
         create_data.delete(:id)
         create_data.delete(:attempts)
         create_data.delete(:last_fail_at)
         create_data.delete(:last_error)
         @task_store.create(**create_data)
      end
expects_data?() click to toggle source

@return [Boolean] whether the task handler will accept data to be assigned via its :data attribute

# File lib/procrastinator/queue.rb, line 133
def expects_data?
   @task_class.method_defined?(:data=)
end
fetch_task(identifier) click to toggle source

Fetch a task matching the given identifier

@param identifier [Hash] attributes to match

@raise [NoSuchTaskError] when no task matches the identifier. @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.

# File lib/procrastinator/queue.rb, line 87
def fetch_task(identifier)
   identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

   tasks = read(**identifier)

   raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
   if tasks.size > 1
      raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
   end

   TaskMetaData.new(**tasks.first.merge(queue: self))
end
next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) click to toggle source

Constructs the next available task on the queue.

@param logger [Logger] logger to provide to the constructed task handler @param container [Object, nil] container to provide to the constructed task handler @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler @return [LoggedTask, nil] A Task or nil if no task is found

# File lib/procrastinator/queue.rb, line 68
def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
   metadata = next_metas.find(&:runnable?)

   return nil unless metadata

   task = Task.new(metadata, task_handler(data:      metadata.data,
                                          container: container,
                                          logger:    logger,
                                          scheduler: scheduler))

   LoggedTask.new(task, logger: logger)
end

Private Instance Methods

next_metas() click to toggle source
# File lib/procrastinator/queue.rb, line 148
def next_metas
   tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
      t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
   end

   sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
end
sort_tasks(tasks) click to toggle source
# File lib/procrastinator/queue.rb, line 156
def sort_tasks(tasks)
   # TODO: improve this
   # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
   # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
   # results
   # Ideally, we'd use a better algo than qsort for this, but this will do for now
   tasks.shuffle.sort_by(&:run_at)
end
task_handler(data: nil, container: nil, logger: nil, scheduler: nil) click to toggle source
# File lib/procrastinator/queue.rb, line 139
def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
   handler           = @task_class.new
   handler.data      = data if handler.respond_to?(:data=)
   handler.container = container
   handler.logger    = logger
   handler.scheduler = scheduler
   handler
end