class Backburner::AllQWrapper
Constants
- DEFAULT_TIMEOUT
Public Class Methods
new(url = 'localhost:8090')
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 85 def initialize(url = 'localhost:8090') allq_conf = Allq::Configuration.new do |config| config.host = url end raw_client = Allq::ApiClient.new(allq_conf) @client = Allq::ActionsApi.new(raw_client) @admin = Allq::AdminApi.new(raw_client) @recent_times = [] end
Public Instance Methods
build_new_job(body, options)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 176 def build_new_job(body, options) adjusted_priority = map_priority(options[:pri] || 5) ttl = options[:ttl] || options[:ttr] || DEFAULT_TIMEOUT tube_name = options[:tube_name] || 'default' delay = options[:delay] || 0 parent_id = options[:parent_id] Allq::NewJob.new(tube: tube_name, body: Base64.strict_encode64(body), ttl: ttl, delay: delay, priority: adjusted_priority, shard_key: options[:shard_key], parent_id: parent_id) end
build_new_parent_job(body, options)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 193 def build_new_parent_job(body, options) adjusted_priority = map_priority(options[:pri] || 5) ttl = options[:ttl] || options[:ttr] || DEFAULT_TIMEOUT tube_name = options[:tube_name] || 'default' delay = options[:delay] || 0 parent_id = options[:parent_id] limit = options[:limit] timeout = options[:timeout] || 3_600 run_on_timeout = options[:run_on_timeout] || false Allq::NewParentJob.new(tube: tube_name, body: Base64.strict_encode64(body), ttl: ttl, delay: delay, priority: adjusted_priority, timeout: timeout, run_on_timeout: run_on_timeout, shard_key: options[:shard_key], limit: limit) end
bury(job)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 118 def bury(job) @client.bury_put(job.id) end
close()
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 155 def close rescue StandardError => e puts(e) end
delete(job)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 110 def delete(job) @client.job_delete(job.id) end
done(job)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 106 def done(job) @client.job_delete(job.id) end
get(tube_name = 'default')
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 139 def get(tube_name = 'default') job = nil job = @client.job_get(tube_name) # Inplace decode job.body = Base64.decode64(job.body) if job&.body Backburner::AllQJob.new(self, job) rescue StandardError => e if e.message == "Couldn't resolve host name" puts('COUDNT RESOLVE HOST NAME------ SHOULD REBOOT') else puts(e) end end
log_result(job_result)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 170 def log_result(job_result) puts("ALLQ-HTTP-JOB-ID=#{job_result.job_id}") rescue StandardError => e puts(e) end
map_priority(app_priority)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 160 def map_priority(app_priority) app_priority = app_priority.to_i # IF already using allq-like priority, stick with it return app_priority if app_priority < 11 && app_priority > 0 # return app_priority unless larger than 10 app_priority > 10 ? 5 : app_priority end
peek_buried(tube_name = 'default')
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 131 def peek_buried(tube_name = 'default') job = @client.peek_get(tube_name, buried: true) return nil if job.body.nil? job.body = Base64.decode64(job.body) if job Backburner::AllQJob.new(self, job) end
put(body, options)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 214 def put(body, options) # New school put retry_count = 0 is_parent = options[:is_parent] || false result = nil begin Timeout.timeout(10) do if is_parent new_job = build_new_parent_job(body, options) result = @client.parent_job_post(new_job) else new_job = build_new_job(body, options) result = @client.job_post(new_job) end raise 'PUT returned nil' if result.nil? || result.to_s == '' end rescue Timeout::Error puts('ALLQ PUT timeout, retrying...') sleep(5) retry_count += 1 retry if retry_count < 4 raise "Failed to put on allq, we are investigating the problem, please try again -> #{body}" rescue StandardError => e puts('Failed to ALLQ PUT, retrying...') puts(e) retry_count += 1 sleep(5) retry if retry_count < 4 raise "Failed to put on allq, we are investigating the problem, please try again: #{body}" end result end
release(job, _delay = 0)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 114 def release(job, _delay = 0) @client.release_put(job.id) end
speed()
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 96 def speed return @recent_times.sum(0.0) / @recent_times.size if @recent_times.size > 0 0 end
stats(tube)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 248 def stats(tube) final_stats = stats final_stats[tube] end
touch(job)
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 102 def touch(job) @client.touch_put(job.id) end
tube_names()
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 122 def tube_names stats_hash = stats stats_hash.keys end
tubes()
click to toggle source
# File lib/backburner/allq_wrapper.rb, line 127 def tubes tube_names end