class Qless::Middleware::Timeout

Applies a hard time out. To use this middleware, instantiate it and pass a block; the block will be passed the job object (which has a `ttl` method for getting the job's remaining TTL), and the block should return the desired timeout in seconds. This allows you to set a hard constant time out to a particular job class (using something like `extend Qless::Middleware::Timeout.new { 60 * 60 }`), or a variable timeout based on the individual TTLs of each job (using something like `extend Qless::Middleware::Timeout.new { |job| job.ttl * 1.1 }`).

Public Class Methods

new(opts = {}) { |job| ... } click to toggle source
Calls superclass method
# File lib/qless/middleware/timeout.rb, line 19
def initialize(opts = {})
  timeout_class = opts.fetch(:timeout_class, ::Timeout)
  kernel_class = opts.fetch(:kernel_class, Kernel)
  module_eval do
    define_method :around_perform do |job|
      timeout_seconds = yield job

      return super(job) if timeout_seconds.nil?

      if !timeout_seconds.is_a?(Numeric) || timeout_seconds <= 0
        raise InvalidTimeoutError, "Timeout must be a positive number or nil, " \
                                   "but was #{timeout_seconds}"
      end

      begin
        timeout_class.timeout(timeout_seconds) { super(job) }
      rescue ::Timeout::Error => e
        error = JobTimedoutError.new("Qless: job timeout (#{timeout_seconds}) exceeded.")
        error.set_backtrace(e.backtrace)
        # The stalled connection to redis might be the cause of the timeout. We cannot rely
        # on state of connection either (e.g., we might be in the middle of Redis call when
        # timeout happend). To play it safe, we reconnect.
        job.reconnect_to_redis
        job.fail(*Qless.failure_formatter.format(job, error, []))
        # Since we are leaving with bang (exit!), normal requeue logic does not work.
        # Do it manually right here.
        if self.is_a?(::Qless::Middleware::RequeueExceptions) &&
           self.requeueable?(JobTimedoutError)
          self.handle_exception(job, error)
        end

        # ::Timeout.timeout is dangerous to use as it can leave things in an inconsistent
        # state. With Redis, for example, we've seen the socket buffer left with unread bytes
        # on it, which can affect later redis calls. Thus, it's much safer just to exit, and
        # allow the parent process to restart the worker in a known, clean state.
        #
        # We use 73 as a unique exit status for this case. 73 looks
        # a bit like TE (Timeout::Error)
        kernel_class.exit!(73)
      end
    end
  end
end