class QueueClassicPlus::Base
Public Class Methods
_perform(*args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 102 def self._perform(*args) Metrics.timing("qu_perform_time", source: librato_key) do if skip_transaction perform(*args) else transaction do # .to_i defaults to 0, which means no timeout in postgres timeout = ENV['POSTGRES_STATEMENT_TIMEOUT'].to_i * 1000 execute "SET LOCAL statement_timeout = #{timeout}" perform(*args) end end end end
can_enqueue?(method, *args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 55 def self.can_enqueue?(method, *args) if locked? max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i q = "SELECT COUNT(1) AS count FROM ( SELECT 1 FROM queue_classic_jobs WHERE q_name = $1 AND method = $2 AND args::text = $3::text AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds') LIMIT 1 ) AS x" result = QC.default_conn_adapter.execute(q, @queue, method, args.to_json) result['count'].to_i == 0 else true end end
disable_retries!()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 31 def self.disable_retries! unless self.retries_on.empty? raise 'disable_retries! should not be enabled in conjunction with retry!' end self.disable_retries = true end
do(*args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 96 def self.do(*args) Metrics.timing("qc_enqueue_time", source: librato_key) do enqueue_perform(*args) end end
enqueue(method, *args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 77 def self.enqueue(method, *args) if can_enqueue?(method, *args) queue.enqueue(method, *args) end end
enqueue_perform(*args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 83 def self.enqueue_perform(*args) enqueue("#{self.to_s}._perform", *args) end
enqueue_perform_in(time, *args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 87 def self.enqueue_perform_in(time, *args) raise "Can't enqueue in the future for locked jobs" if locked? queue.enqueue_in(time, "#{self.to_s}._perform", *args) end
librato_key()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 117 def self.librato_key Inflector.underscore(self.name || "").gsub(/\//, ".") end
list()
click to toggle source
Debugging
# File lib/queue_classic_plus/base.rb, line 140 def self.list q = "SELECT * FROM queue_classic_jobs WHERE q_name = '#{@queue}'" execute q end
lock!()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 39 def self.lock! self.locked = true end
locked?()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 47 def self.locked? !!self.locked end
logger()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 51 def self.logger QueueClassicPlus.logger end
queue()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 5 def self.queue QC::Queue.new(@queue) end
restart_in(time, remaining_retries, *args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 92 def self.restart_in(time, remaining_retries, *args) queue.enqueue_retry_in(time, "#{self.to_s}._perform", remaining_retries, *args) end
retries_on?(exception)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 27 def self.retries_on? exception self.retries_on[exception.class] || self.retries_on.keys.any? {|klass| exception.is_a? klass} end
retry!(on: RuntimeError, max: 5)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 19 def self.retry!(on: RuntimeError, max: 5) if self.disable_retries raise 'retry! should not be used in conjuction with disable_retries!' end Array(on).each {|e| self.retries_on[e] = true} self.max_retries = max end
skip_transaction!()
click to toggle source
# File lib/queue_classic_plus/base.rb, line 43 def self.skip_transaction! self.skip_transaction = true end
transaction(options = {}, &block)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 121 def self.transaction(options = {}, &block) if defined?(ActiveRecord) # If ActiveRecord is loaded, we use it's own transaction mechanisn since # it has slightly different semanctics for rollback. ActiveRecord::Base.transaction(options, &block) else begin execute "BEGIN" block.call rescue execute "ROLLBACK" raise end execute "COMMIT" end end
Private Class Methods
execute(sql, *args)
click to toggle source
# File lib/queue_classic_plus/base.rb, line 146 def self.execute(sql, *args) QC.default_conn_adapter.execute(sql, *args) end