class Recurrent::Task

Attributes

action[RW]
logger[RW]
name[RW]
save[RW]
schedule[RW]
scheduler[RW]
thread[RW]

Public Class Methods

new(options={}) click to toggle source
# File lib/recurrent/task.rb, line 7
def initialize(options={})
  @name = options[:name]
  @schedule = options[:schedule]
  @action = options[:action]
  @save = options[:save]
  @logger = options[:logger]
  @scheduler = options[:scheduler]
  @disable_task_locking = options[:disable_task_locking]
  Configuration.save_task_schedule.call(name, schedule) if Configuration.save_task_schedule
end

Public Instance Methods

call_action(execution_time=nil) click to toggle source
# File lib/recurrent/task.rb, line 38
def call_action(execution_time=nil)
  if Configuration.task_locking && !@disable_task_locking
    logger.info "#{name} - #{execution_time.to_s(:seconds)}: attempting to establish lock"
    lock_established = Configuration.task_locking.call(name) do
      if Configuration.load_task_return_value && action.arity == 1
        previous_value = Configuration.load_task_return_value.call(name)

        return_value = action.call(previous_value)
      else
        return_value = action.call
      end
      save_results(return_value) if save?

      # If a task finishes quickly hold the lock for a few seconds to avoid releasing it before other processes try to pick up the task
      sleep(1) until Time.now - execution_time > 5 if execution_time
    end
    logger.info "#{name} - #{execution_time.to_s(:seconds)}: locked by another process" unless lock_established
  else
    if Configuration.load_task_return_value && action.arity == 1
      previous_value = Configuration.load_task_return_value.call(name)

      return_value = action.call(previous_value)
    else
      return_value = action.call
    end
    save_results(return_value) if save?
  end
end
execute(execution_time) click to toggle source
# File lib/recurrent/task.rb, line 18
def execute(execution_time)
  return handle_still_running(execution_time) if running?
  @thread = Thread.new do
    Thread.current["execution_time"] = execution_time
    scheduler && scheduler.increment_executing_tasks
    begin
      if Configuration.maximum_concurrent_tasks.present?
        call_action(execution_time) unless (scheduler.executing_tasks > Configuration.maximum_concurrent_tasks)
      else
        call_action(execution_time)
      end
    rescue => e
      logger.warn("#{name} - #{e.message}")
      logger.warn(e.backtrace)
    ensure
      scheduler && scheduler.decrement_executing_tasks
    end
  end
end
handle_still_running(current_time) click to toggle source
# File lib/recurrent/task.rb, line 67
def handle_still_running(current_time)
  logger.info "#{name}: Execution from #{thread['execution_time'].to_s(:seconds)} still running, aborting this execution."
  if Configuration.handle_slow_task
    Configuration.handle_slow_task.call(name, current_time, thread['execution_time'])
  end
end
next_occurrence() click to toggle source
# File lib/recurrent/task.rb, line 74
def next_occurrence
  occurrence = schedule.next_occurrence
  schedule.start_date = occurrence
end
running?() click to toggle source
# File lib/recurrent/task.rb, line 96
def running?
  thread.try(:alive?)
end
save?() click to toggle source
# File lib/recurrent/task.rb, line 79
def save?
  !!save
end
save_results(return_value) click to toggle source
# File lib/recurrent/task.rb, line 83
def save_results(return_value)
  logger.info "#{name}: Wants to save its return value."
  if Configuration.save_task_return_value
    Configuration.save_task_return_value.call(:name => name,
                                              :return_value => return_value,
                                              :executed_at => thread['execution_time'],
                                              :executed_by => logger.identifier)
    logger.info "#{name}: Return value saved."
  else
    logger.info "#{name}: No method to save return values is configured."
  end
end