class QueueClassicPlus::Base

Public Class Methods

_perform(*args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 102
def self._perform(*args)
  Metrics.timing("qu_perform_time", source: librato_key) do
    if skip_transaction
      perform(*args)
    else
      transaction do
        # .to_i defaults to 0, which means no timeout in postgres
        timeout = ENV['POSTGRES_STATEMENT_TIMEOUT'].to_i * 1000
        execute "SET LOCAL statement_timeout = #{timeout}"
        perform(*args)
      end
    end
  end
end
can_enqueue?(method, *args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 55
def self.can_enqueue?(method, *args)
  if locked?
    max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i

    q = "SELECT COUNT(1) AS count
         FROM
           (
             SELECT 1
             FROM queue_classic_jobs
             WHERE q_name = $1 AND method = $2 AND args::text = $3::text
               AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds')
             LIMIT 1
           )
         AS x"

    result = QC.default_conn_adapter.execute(q, @queue, method, args.to_json)
    result['count'].to_i == 0
  else
    true
  end
end
disable_retries!() click to toggle source
# File lib/queue_classic_plus/base.rb, line 31
def self.disable_retries!
  unless self.retries_on.empty?
    raise 'disable_retries! should not be enabled in conjunction with retry!'
  end

  self.disable_retries = true
end
do(*args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 96
def self.do(*args)
  Metrics.timing("qc_enqueue_time", source: librato_key) do
    enqueue_perform(*args)
  end
end
enqueue(method, *args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 77
def self.enqueue(method, *args)
  if can_enqueue?(method, *args)
    queue.enqueue(method, *args)
  end
end
enqueue_perform(*args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 83
def self.enqueue_perform(*args)
  enqueue("#{self.to_s}._perform", *args)
end
enqueue_perform_in(time, *args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 87
def self.enqueue_perform_in(time, *args)
  raise "Can't enqueue in the future for locked jobs" if locked?
  queue.enqueue_in(time, "#{self.to_s}._perform", *args)
end
librato_key() click to toggle source
# File lib/queue_classic_plus/base.rb, line 117
def self.librato_key
  Inflector.underscore(self.name || "").gsub(/\//, ".")
end
list() click to toggle source

Debugging

# File lib/queue_classic_plus/base.rb, line 140
def self.list
  q = "SELECT * FROM queue_classic_jobs WHERE q_name = '#{@queue}'"
  execute q
end
lock!() click to toggle source
# File lib/queue_classic_plus/base.rb, line 39
def self.lock!
  self.locked = true
end
locked?() click to toggle source
# File lib/queue_classic_plus/base.rb, line 47
def self.locked?
  !!self.locked
end
logger() click to toggle source
# File lib/queue_classic_plus/base.rb, line 51
def self.logger
  QueueClassicPlus.logger
end
queue() click to toggle source
# File lib/queue_classic_plus/base.rb, line 5
def self.queue
  QC::Queue.new(@queue)
end
restart_in(time, remaining_retries, *args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 92
def self.restart_in(time, remaining_retries, *args)
  queue.enqueue_retry_in(time, "#{self.to_s}._perform", remaining_retries, *args)
end
retries_on?(exception) click to toggle source
# File lib/queue_classic_plus/base.rb, line 27
def self.retries_on? exception
  self.retries_on[exception.class] || self.retries_on.keys.any? {|klass| exception.is_a? klass}
end
retry!(on: RuntimeError, max: 5) click to toggle source
# File lib/queue_classic_plus/base.rb, line 19
def self.retry!(on: RuntimeError, max: 5)
  if self.disable_retries
    raise 'retry! should not be used in conjuction with disable_retries!'
  end
  Array(on).each {|e| self.retries_on[e] = true}
  self.max_retries = max
end
skip_transaction!() click to toggle source
# File lib/queue_classic_plus/base.rb, line 43
def self.skip_transaction!
  self.skip_transaction = true
end
transaction(options = {}, &block) click to toggle source
# File lib/queue_classic_plus/base.rb, line 121
def self.transaction(options = {}, &block)
  if defined?(ActiveRecord)
    # If ActiveRecord is loaded, we use it's own transaction mechanisn since
    # it has slightly different semanctics for rollback.
    ActiveRecord::Base.transaction(options, &block)
  else
    begin
      execute "BEGIN"
      block.call
    rescue
      execute "ROLLBACK"
      raise
    end

    execute "COMMIT"
  end
end

Private Class Methods

execute(sql, *args) click to toggle source
# File lib/queue_classic_plus/base.rb, line 146
def self.execute(sql, *args)
  QC.default_conn_adapter.execute(sql, *args)
end