module BoyBand::WorkerMethods

Public Instance Methods

action_types(queue='default') click to toggle source
# File lib/boy_band.rb, line 224
def action_types(queue='default')
  idx = Resque.size(queue)
  list = []
  idx.times do |i|
    item = Resque.peek(queue, i)
    chain = nil
    if item && item['args'] && item['args'][-1].match(/^chain::/)
      chain = item['args'].pop
    end
    if chain
      match = chain.scan(/##/)
      if match && match.length == 1
        item['root'] = true
        list << item
      elsif match && match.length > 1
        list << item
      end
    end
  end
  count = {'root' => {}, 'non_root' => {}}
  list.each do |item|
    key = "#{item['args'][0]}::#{item['args'][2].is_a?(Hash) ? item['args'][2]['method'] : item['args'][1]}"
    count[item['root'] ? 'root' : 'non_root'][key] ||= 0
    count[item['root'] ? 'root' : 'non_root'][key] += 1
  end.length
  count
end
clear_job(hash) click to toggle source
# File lib/boy_band.rb, line 106
  def clear_job(hash)
#     if Resque.redis
#       timestamps = JSON.parse(Resque.redis.hget('hashed_jobs', hash) || "[]")
#       timestamps.shift
# #      Resque.redis.hset('hashed_jobs', hash, timestamps.to_json)
#     end
  end
current_speed() click to toggle source
# File lib/boy_band.rb, line 130
def current_speed
  @speed
end
domain_id() click to toggle source
# File lib/boy_band.rb, line 25
def domain_id
  @@domain_id ||= "default"
  @@domain_id
end
find_actions(method) click to toggle source
# File lib/boy_band.rb, line 252
def find_actions(method)
  queue = 'default'
  idx = Resque.size(queue)
  list = []
  idx.times do |i|
    item = Resque.peek(queue, i)
    if item && item['args'] && item['args'][2].is_a?(Hash) && item['args'][2]['method'] == method
      list << item
      puts item.to_json
    end
  end
  list
end
flush_queues() click to toggle source
# File lib/boy_band.rb, line 362
def flush_queues
  if Resque.redis
    Resque.queues.each do |key|
      Resque.redis.ltrim("queue:#{key}", 1, 0)
    end
  end
  Resque.redis.del('hashed_jobs')
end
in_worker_process?() click to toggle source
# File lib/boy_band.rb, line 126
def in_worker_process?
  BoyBand.job_instigator.match(/^job/)
end
job_chain() click to toggle source
# File lib/boy_band.rb, line 34
def job_chain
  @@job_chain ||= "none"
  @@job_chain
end
kill_all_workers() click to toggle source
# File lib/boy_band.rb, line 330
def kill_all_workers
  Resque.workers.each{|w| w.unregister_worker }
end
note_job(hash) click to toggle source
# File lib/boy_band.rb, line 96
  def note_job(hash)
#     if Resque.redis
#       timestamps = JSON.parse(Resque.redis.hget('hashed_jobs', hash) || "[]")
#       cutoff = 6.hours.ago.to_i
#       timestamps = timestamps.select{|ts| ts > cutoff }
#       timestamps.push(Time.now.to_i)
# #      Resque.redis.hset('hashed_jobs', hash, timestamps.to_json)
#     end
  end
on_failure_retry(e, *args) click to toggle source
# File lib/boy_band.rb, line 173
def on_failure_retry(e, *args)
  # TODO...
end
perform(*args) click to toggle source
# File lib/boy_band.rb, line 118
def perform(*args)
  perform_at(:normal, *args)
end
perform_at(speed, *args) click to toggle source
# File lib/boy_band.rb, line 134
def perform_at(speed, *args)
  @speed = speed
  args_copy = [] + args
  if args_copy[-1].is_a?(String) && args_copy[-1].match(/^chain::/)
    set_job_chain(args_copy.pop.split(/::/, 2)[1])
  end
  if args_copy[-1].is_a?(String) && args_copy[-1].match(/^domain::/)
    set_domain_id(args_copy.pop.split(/::/, 2)[1])
  end
  klass_string = args_copy.shift
  klass = Object.const_get(klass_string)
  method_name = args_copy.shift
  job_hash = Digest::MD5.hexdigest(args_copy.to_json)
  Resque.redis.del("scheduled/#{klass_string}/#{method_name}/#{job_hash}")
  hash = args_copy[0] if args_copy[0].is_a?(Hash)
  hash ||= {'method' => method_name}
  action = "#{klass_string} . #{hash['method']} (#{hash['id']})"
  pre_whodunnit = BoyBand.job_instigator
  BoyBand.set_job_instigator("job:#{action}")
  Rails.logger.info("performing #{action}")
  start = self.ts
  klass.last_scheduled_stamp = hash['scheduled'] if klass.respond_to?('last_scheduled_stamp=')
  klass.send(method_name, *args_copy)
  diff = self.ts - start
  Rails.logger.info("done performing #{action}, finished in #{diff}s")
  # TODO: way to track what queue a job is coming from
  if diff > 60 && speed == :normal
    Rails.logger.error("long-running job, #{action}, #{diff}s")
  elsif diff > 60*10 && speed == :slow
    Rails.logger.error("long-running job, #{action} (expected slow), #{diff}s")
  end
  set_job_chain("none")
  BoyBand.set_job_instigator(pre_whodunnit)
  clear_job(job_hash)
  @speed = nil
rescue Resque::TermException
  Resque.enqueue(self, *args)
end
process_queues() click to toggle source
# File lib/boy_band.rb, line 334
def process_queues
  schedules = []
  Resque.queues.each do |key|
    while Resque.size(key) > 0
      schedules << {queue: key, action: Resque.pop(key)}
    end
  end
  schedules.each do |schedule|
    queue = schedule[:queue]
    schedule  = schedule[:action]
    if queue == 'slow'
      raise "unknown job: #{schedule.to_json}" if schedule['class'] != 'SlowWorker'
      SlowWorker.perform(*(schedule['args']))
    else
      raise "unknown job: #{schedule.to_json}" if schedule['class'] != 'Worker'
      Worker.perform(*(schedule['args']))
    end
  end
end
prune_dead_workers() click to toggle source
# File lib/boy_band.rb, line 326
def prune_dead_workers
  Resque.workers.each{|w| w.prune_dead_workers }
end
queue_size(queue) click to toggle source
# File lib/boy_band.rb, line 43
def queue_size(queue)
  size = Resque.redis.get("sizeof/#{queue}").to_i
  if !size || size == 0
    size = Resque.size(queue)
    Resque.redis.setex("sizeof/#{queue}", 30.seconds.to_i, size)
  end
  size
end
queues_empty?() click to toggle source
# File lib/boy_band.rb, line 354
def queues_empty?
  found = false
  Resque.queues.each do |key|
    return false if Resque.size(key) > 0
  end
  true
end
root_actions(queue='default') click to toggle source
# File lib/boy_band.rb, line 204
def root_actions(queue='default')
  idx = Resque.size(queue)
  job_ids = {}
  idx.times do |i|
    item = Resque.peek(queue, i)
    chain = nil
    if item && item['args'] && item['args'][-1].match(/^chain::/)
      chain = item['args'].pop
    end
    if chain
      parts = chain.split(/##/)
      job_ids[parts[0]] ||= [parts[1], 0, []]
      job_ids[parts[0]][1] += 1
      job_ids[parts[0]][2] << parts.length - 2
    end
  end
  job_ids.each{|k, v| job_ids.delete(k) if v[1] <= 5}.length
  job_ids
end
schedule(klass, method_name, *args) click to toggle source
# File lib/boy_band.rb, line 114
def schedule(klass, method_name, *args)
  schedule_for(:default, klass, method_name, *args)
end
schedule_for(queue, klass, method_name, *args) click to toggle source
# File lib/boy_band.rb, line 52
def schedule_for(queue, klass, method_name, *args)
  queue = queue.to_sym
  @queue = queue.to_s
  job_hash = Digest::MD5.hexdigest(args.to_json)
  note_job(job_hash)
  size = queue_size(queue)
  args.push("domain::#{self.domain_id}")
  chain = self.job_chain.split(/##/)
  job_id = "j#{Time.now.iso8601}_#{rand(9999)}"
  chain = [job_id] if chain == ["none"]
  if chain.length > 1
    Resque.redis.incr("jobs_from_#{chain[0]}") 
    Resque.redis.expire("jobs_from_#{chain[0]}", 24.hours.to_i)
  end
  Resque.redis.setex("scheduled/#{klass.to_s}/#{job_hash}", 6.hours, "t")
  chain_args = args[0..-2]
  if chain_args.length == 1 && chain_args[0].is_a?(Hash)
    chain_args = [chain_args[0]['method'],chain_args[0]['id'],chain_args[0]['arguments'].to_s[0, 20]]
  end
  chain.push("#{klass.to_s},#{method_name.to_s},#{chain_args.join('+')}")
  # Rails.logger.warn("jobchain set, #{chain[0]} #{chain.join('##')}") if chain.length > 2
  if chain.length > 5
    Rails.logger.error("jobchain too deep: #{chain[1]}, #{chain.length} entries")
  end
  job_count = Resque.redis.get("jobs_from_#{chain[0]}")
  if job_count && job_count.to_i > 50
    Rails.logger.error("jobchain too many sub-jobs: #{chain[1]}, #{job_count} so far")
  end
  args.push("chain::#{chain[0, 50].join('##')}")
  if queue == :slow
    Resque.enqueue(SlowWorker, klass.to_s, method_name, *args)
    if size > 1000 && !Resque.redis.get("queue_warning_#{queue}")
      Resque.redis.setex("queue_warning_#{queue}", 5.minutes.to_i, "true")
      Rails.logger.error("job queue full: #{queue}, #{size} entries")
    end
  else
    Resque.enqueue(Worker, klass.to_s, method_name, *args)
    if size > 5000 && !Resque.redis.get("queue_warning_#{queue}")
      Resque.redis.setex("queue_warning_#{queue}", 5.minutes.to_i, "true")
      Rails.logger.error("job queue full: #{queue}, #{size} entries")
    end
  end
end
scheduled?(klass, method_name, *args) click to toggle source
# File lib/boy_band.rb, line 317
def scheduled?(klass, method_name, *args)
  scheduled_for?('default', klass, method_name, *args)
end
scheduled_actions(queue='default') click to toggle source
# File lib/boy_band.rb, line 177
def scheduled_actions(queue='default')
  queues = [queue]
  if queue == '*'
    queues = []
    Resque.queues.each do |key|
      queues << key
    end
  end

  res = []
  queues.each do |queue|
    idx = Resque.size(queue)
    idx.times do |i|
      item = Resque.peek(queue, i)
      if item && item['args'] && item['args'][-1].match(/^chain::/)
        chain = item['args'].pop
      end
      if item && item['args'] && item['args'][-1].match(/^domain::/)
        domain = item['args'].pop
        item['domain_id'] = domain.split(/::/, 2)[1]
      end
      res << item if item
    end
  end
  res
end
scheduled_for?(queue, klass, method_name, *args) click to toggle source
# File lib/boy_band.rb, line 266
def scheduled_for?(queue, klass, method_name, *args)
  args_copy = [] + args
  if args[-1].is_a?(String) && args[-1].match(/^chain::/)
    args_copy.pop.split(/::/, 2)[1]
  end
  if args[-1].is_a?(String) && args[-1].match(/^domain::/)
    set_domain_id(args_copy.pop.split(/::/, 2)[1])
  end

  idx = queue_size(queue)
  job_hash = args_copy.to_json
  return true if Resque.redis.get("scheduled/#{klass.to_s}/#{method_name}/#{job_hash}") == "t"
  return false if idx > 500 # big queues mustn't be searched this way
  idx = Resque.size(queue)
  queue_class = (queue.to_s == 'slow' ? 'SlowWorker' : 'Worker')
  if false
    job_hash = args_copy.to_json
    timestamps = JSON.parse(Resque.redis.hget('hashed_jobs', job_hash) || "[]")
    cutoff = 6.hours.ago.to_i
    return timestamps.select{|ts| ts > cutoff }.length > 0
  else
    start = 0
    while start < idx
      items = Resque.peek(queue, start, 1000)
      start += items.length > 0 ? items.length : 1
      items.each do |schedule|
        if schedule && schedule['class'] == queue_class && schedule['args'][0] == klass.to_s && schedule['args'][1] == method_name.to_s
          a1 = args_copy
          if a1.length == 1 && a1[0].is_a?(Hash)
            a1 = [a1[0].dup]
            a1[0].delete('scheduled')
            a1[0].delete('domain_id')
          end
          a2 = schedule['args'][2..-1]
          a2.pop if a2.length > 1 && a2[-1].is_a?(String) && a2[-1].match(/^chain::/)
          a2.pop if a2.length > 1 && a2[-1].is_a?(String) && a2[-1].match(/^domain::/)
          if a2.length == 1 && a2[0].is_a?(Hash)
            a2 = [a2[0].dup]
            a2[0].delete('scheduled')
            a2[0].delete('domain_id')
          end
          if a1.to_json == a2.to_json
            return true
          end
        end
      end
    end
  end
  return false
end
set_domain_id(val) click to toggle source
# File lib/boy_band.rb, line 30
def set_domain_id(val)
  @@domain_id = val
end
set_job_chain(val) click to toggle source
# File lib/boy_band.rb, line 39
def set_job_chain(val)
  @@job_chain = val
end
stop_stuck_workers() click to toggle source
# File lib/boy_band.rb, line 321
def stop_stuck_workers
  timeout = 8.hours.to_i
  Resque.workers.each {|w| w.unregister_worker if w.processing['run_at'] && Time.now - w.processing['run_at'].to_time > timeout}    
end
thread_id() click to toggle source
# File lib/boy_band.rb, line 21
def thread_id
  "#{Process.pid}_#{Thread.current.object_id}"
end
transfer_backlog(queue) click to toggle source
# File lib/boy_band.rb, line 371
def transfer_backlog(queue)
  saves = []
  dos = []
  while Resque.size(queue) > 0 && (saves.length + dos.length) < 10000
    job = Resque.pop(queue)
    if job
      if job['args'] && job['args'][2] && job['args'][2]['method'] == 'track_downstream_boards!'
        saves.push(job)
      else
        dos.push(job)
      end
    end
  end
  dos.each{|job| Resque.enqueue(Worker, *job['args']) }; dos.length
  hash = saves.group_by{|j| j['args'][2]['id'] }; hash.length
  hash.each do |id, jobs|
    list = []
    max_stamp = 48.hours.ago.to_i
    jobs.each do |j| 
      list += j['args'][2]['arguments'][0] 
      max_stamp = [max_stamp, j['args'][2]['arguments'][2] || 0].max
    end
    args = jobs[0]['args'][2]
    args['arguments'] = [list.uniq, nil, max_stamp]
    Resque.enqueue(SlowWorker, job['args'][0], job['args'][1], args)
  end; hash.keys.length
  Resque.size(queue)
end
ts() click to toggle source
# File lib/boy_band.rb, line 122
def ts
  Time.now.to_i
end