class Backburner::AllQWrapper

Constants

DEFAULT_TIMEOUT

Public Class Methods

new(url = 'localhost:8090') click to toggle source
# File lib/backburner/allq_wrapper.rb, line 85
def initialize(url = 'localhost:8090')
  allq_conf = Allq::Configuration.new do |config|
    config.host = url
  end

  raw_client = Allq::ApiClient.new(allq_conf)
  @client = Allq::ActionsApi.new(raw_client)
  @admin = Allq::AdminApi.new(raw_client)
  @recent_times = []
end

Public Instance Methods

build_new_job(body, options) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 176
def build_new_job(body, options)
  adjusted_priority = map_priority(options[:pri] || 5)

  ttl = options[:ttl] || options[:ttr] || DEFAULT_TIMEOUT
  tube_name = options[:tube_name] || 'default'
  delay = options[:delay] || 0
  parent_id = options[:parent_id]

  Allq::NewJob.new(tube: tube_name,
                   body: Base64.strict_encode64(body),
                   ttl: ttl,
                   delay: delay,
                   priority: adjusted_priority,
                   shard_key: options[:shard_key],
                   parent_id: parent_id)
end
build_new_parent_job(body, options) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 193
def build_new_parent_job(body, options)
  adjusted_priority = map_priority(options[:pri] || 5)
  ttl = options[:ttl] || options[:ttr] || DEFAULT_TIMEOUT
  tube_name = options[:tube_name] || 'default'
  delay = options[:delay] || 0
  parent_id = options[:parent_id]
  limit = options[:limit]
  timeout = options[:timeout] || 3_600
  run_on_timeout = options[:run_on_timeout] || false

  Allq::NewParentJob.new(tube: tube_name,
                         body: Base64.strict_encode64(body),
                         ttl: ttl,
                         delay: delay,
                         priority: adjusted_priority,
                         timeout: timeout,
                         run_on_timeout: run_on_timeout,
                         shard_key: options[:shard_key],
                         limit: limit)
end
bury(job) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 118
def bury(job)
  @client.bury_put(job.id)
end
close() click to toggle source
# File lib/backburner/allq_wrapper.rb, line 155
def close
rescue StandardError => e
  puts(e)
end
delete(job) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 110
def delete(job)
  @client.job_delete(job.id)
end
done(job) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 106
def done(job)
  @client.job_delete(job.id)
end
get(tube_name = 'default') click to toggle source
# File lib/backburner/allq_wrapper.rb, line 139
def get(tube_name = 'default')
  job = nil
  job = @client.job_get(tube_name)

  # Inplace decode
  job.body = Base64.decode64(job.body) if job&.body

  Backburner::AllQJob.new(self, job)
rescue StandardError => e
  if e.message == "Couldn't resolve host name"
    puts('COUDNT RESOLVE HOST NAME------ SHOULD REBOOT')
  else
    puts(e)
  end
end
log_result(job_result) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 170
def log_result(job_result)
  puts("ALLQ-HTTP-JOB-ID=#{job_result.job_id}")
rescue StandardError => e
  puts(e)
end
map_priority(app_priority) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 160
def map_priority(app_priority)
  app_priority = app_priority.to_i

  # IF already using allq-like priority, stick with it
  return app_priority if app_priority < 11 && app_priority > 0

  # return app_priority unless larger than 10
  app_priority > 10 ? 5 : app_priority
end
peek_buried(tube_name = 'default') click to toggle source
# File lib/backburner/allq_wrapper.rb, line 131
def peek_buried(tube_name = 'default')
  job = @client.peek_get(tube_name, buried: true)
  return nil if job.body.nil?

  job.body = Base64.decode64(job.body) if job
  Backburner::AllQJob.new(self, job)
end
put(body, options) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 214
def put(body, options)
  # New school put
  retry_count = 0
  is_parent = options[:is_parent] || false
  result = nil

  begin
    Timeout.timeout(10) do
      if is_parent
        new_job = build_new_parent_job(body, options)
        result = @client.parent_job_post(new_job)
      else
        new_job = build_new_job(body, options)
        result = @client.job_post(new_job)
      end
      raise 'PUT returned nil' if result.nil? || result.to_s == ''
    end
  rescue Timeout::Error
    puts('ALLQ PUT timeout, retrying...')
    sleep(5)
    retry_count += 1
    retry if retry_count < 4
    raise "Failed to put on allq, we are investigating the problem, please try again -> #{body}"
  rescue StandardError => e
    puts('Failed to ALLQ PUT, retrying...')
    puts(e)
    retry_count += 1
    sleep(5)
    retry if retry_count < 4
    raise "Failed to put on allq, we are investigating the problem, please try again: #{body}"
  end
  result
end
release(job, _delay = 0) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 114
def release(job, _delay = 0)
  @client.release_put(job.id)
end
speed() click to toggle source
# File lib/backburner/allq_wrapper.rb, line 96
def speed
  return @recent_times.sum(0.0) / @recent_times.size if @recent_times.size > 0

  0
end
stats(tube) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 248
def stats(tube)
  final_stats = stats
  final_stats[tube]
end
touch(job) click to toggle source
# File lib/backburner/allq_wrapper.rb, line 102
def touch(job)
  @client.touch_put(job.id)
end
tube_names() click to toggle source
# File lib/backburner/allq_wrapper.rb, line 122
def tube_names
  stats_hash = stats
  stats_hash.keys
end
tubes() click to toggle source
# File lib/backburner/allq_wrapper.rb, line 127
def tubes
  tube_names
end