class Sidekiq::Middleware::Server::MaxJobs

Constants

COUNTER_FOR_QUEUE_KEY_TEMPLATE

Constant(s)

COUNTER_KEY
DEFAULT_MAX_JOBS

Default(s)

DEFAULT_MAX_JOBS_FOR_QUEUE
DEFAULT_MAX_JOBS_JITTER
DEFAULT_MAX_JOBS_JITTER_FOR_QUEUE
DEFAULT_MAX_JOBS_RUNTIME
DEFAULT_MAX_JOBS_RUNTIME_JITTER
LOG_INITIALIZATION_TEMPLATE
LOG_MAX_JOBS_QUOTA_MET_FOR_QUEUE_TEMPLATE
LOG_MAX_JOBS_QUOTA_MET_TEMPLATE
LOG_MAX_JOBS_RUNTIME_QUOTA_MET_TEMPLATE
MAX_JOBS_FOR_QUEUE_KEY_TEMPLATE
MAX_JOBS_JITTER_FOR_QUEUE_KEY_TEMPLATE
MAX_JOBS_JITTER_KEY
MAX_JOBS_KEY
MAX_JOBS_RUNTIME_JITTER_KEY
MAX_JOBS_RUNTIME_KEY
MAX_JOBS_RUNTIME_WITH_JITTER_KEY
MAX_JOBS_WITH_JITTER_FOR_QUEUE_KEY_TEMPLATE
MAX_JOBS_WITH_JITTER_KEY
MUTEX_KEY
PID_KEY
START_TIME_KEY
TERM
TERMINATING_KEY
VERSION

Version

Public Class Methods

cache() click to toggle source

Helper Method(s)

# File lib/sidekiq/middleware/server/max_jobs.rb, line 60
def cache
  @cache ||= {}
end
counter() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 64
def counter
  key = COUNTER_KEY
  cache[key] ||= 0
end
counter_for_queue(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 69
def counter_for_queue(queue)
  key = format(COUNTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= 0
end
increment_counter!() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 74
def increment_counter!
  key = COUNTER_KEY
  cache[key] = (cache[key] || 0).next
end
increment_counter_for_queue!(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 79
def increment_counter_for_queue!(queue)
  key = format(COUNTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] = (cache[key] || 0).next
end
log_info(message) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 84
def log_info(message)
  logger_defined = defined?(::Sidekiq.logger)
  logger_defined ? ::Sidekiq.logger.info(message) : puts(message)
end
log_initialization!() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 89
def log_initialization!
  message = format(LOG_INITIALIZATION_TEMPLATE, pid)
  log_info(message)
end
log_max_jobs_quota_met!() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 94
def log_max_jobs_quota_met!
  message = format(LOG_MAX_JOBS_QUOTA_MET_TEMPLATE, pid)
  log_info(message)
end
log_max_jobs_quota_met_for_queue!(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 99
def log_max_jobs_quota_met_for_queue!(queue)
  message = format(
    LOG_MAX_JOBS_QUOTA_MET_FOR_QUEUE_TEMPLATE,
    queue,
    pid
  )
  log_info(message)
end
log_max_jobs_runtime_quota_met!() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 108
def log_max_jobs_runtime_quota_met!
  message = format(LOG_MAX_JOBS_RUNTIME_QUOTA_MET_TEMPLATE, pid)
  log_info(message)
end
max_jobs() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 113
def max_jobs
  key = MAX_JOBS_KEY
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS).to_i
end
max_jobs_for_queue(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 118
def max_jobs_for_queue(queue)
  key = format(MAX_JOBS_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS_FOR_QUEUE).to_i
end
max_jobs_jitter() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 123
def max_jobs_jitter
  key = MAX_JOBS_JITTER_KEY
  cache[key] ||= rand((ENV[key] || DEFAULT_MAX_JOBS_JITTER).to_i)
end
max_jobs_jitter_for_queue(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 128
def max_jobs_jitter_for_queue(queue)
  key = format(MAX_JOBS_JITTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= \
    rand((ENV[key] || DEFAULT_MAX_JOBS_JITTER_FOR_QUEUE).to_i)
end
max_jobs_quota_met?() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 134
def max_jobs_quota_met?
  quota = max_jobs_with_jitter
  quota.positive? ? counter == quota : false
end
max_jobs_quota_met_for_queue?(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 139
def max_jobs_quota_met_for_queue?(queue)
  quota = max_jobs_with_jitter_for_queue(queue)
  quota.positive? ? counter_for_queue(queue) == quota : false
end
max_jobs_runtime() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 144
def max_jobs_runtime
  key = MAX_JOBS_RUNTIME_KEY
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS_RUNTIME).to_i
end
max_jobs_runtime_jitter() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 149
def max_jobs_runtime_jitter
  key = MAX_JOBS_RUNTIME_JITTER_KEY
  cache[key] ||= \
    rand((ENV[key] || DEFAULT_MAX_JOBS_RUNTIME_JITTER).to_i)
end
max_jobs_runtime_quota_met?() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 155
def max_jobs_runtime_quota_met?
  quota = max_jobs_runtime_with_jitter
  quota.positive? ? (::Time.now.to_i - start_time) >= quota : false
end
max_jobs_runtime_with_jitter() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 160
def max_jobs_runtime_with_jitter
  key = MAX_JOBS_RUNTIME_WITH_JITTER_KEY
  cache[key] ||= (max_jobs_runtime + max_jobs_runtime_jitter)
end
max_jobs_with_jitter() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 165
def max_jobs_with_jitter
  key = MAX_JOBS_WITH_JITTER_KEY
  cache[key] ||= (max_jobs + max_jobs_jitter)
end
max_jobs_with_jitter_for_queue(queue) click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 170
def max_jobs_with_jitter_for_queue(queue)
  key = \
    format(MAX_JOBS_WITH_JITTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= \
    (max_jobs_for_queue(queue) + max_jobs_jitter_for_queue(queue))
end
mutex() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 177
def mutex
  key = MUTEX_KEY
  cache[key] ||= ::Mutex.new
end
pid() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 182
def pid
  key = PID_KEY
  cache[key] ||= ::Process.pid
end
start_time() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 187
def start_time
  key = START_TIME_KEY
  cache[key] ||= ::Time.now.to_i
end
terminate!() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 192
def terminate!
  key = TERMINATING_KEY
  cache[key] = true && ::Process.kill(TERM, pid)
end
terminating?() click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 197
def terminating?
  key = TERMINATING_KEY
  cache[key] == true
end

Public Instance Methods

call( _, _, queue ) { || ... } click to toggle source
# File lib/sidekiq/middleware/server/max_jobs.rb, line 203
def call(
  _,     # worker-instance
  _,     # item
  queue
)
  exception_raised = false
  begin
    yield
  rescue Exception
    # Set the `exception_raised` boolean to `true` so that the counter
    # *is not* incremented in the `ensure` block
    exception_raised = true
    # Re-raise the `Exception` so that _Sidekiq_ can deal w/ it
    raise
  ensure
    if !exception_raised && !self.class.terminating?
      self.class.mutex.synchronize do
        # Controls whether or not the process will be TERMinated at the
        # end of the block
        terminate = false

        # First check if the runtime quota has been met
        if self.class.max_jobs_runtime_quota_met?
          self.class.log_max_jobs_runtime_quota_met!
          terminate = true
        end

        # Increment the total counter
        self.class.increment_counter!

        # Next, check if the total quota has been met
        if !terminate && self.class.max_jobs_quota_met?
          self.class.log_max_jobs_quota_met!
          terminate = true
        end

        # Increment the queue specific counter
        self.class.increment_counter_for_queue!(queue)

        # Last[ly], check if the queue quota has been met
        if !terminate && self.class.max_jobs_quota_met_for_queue?(queue)
          self.class.log_max_jobs_quota_met_for_queue!(queue)
          terminate = true
        end

        # If applicable, terminate
        self.class.terminate! if terminate
      end
    end
  end
end