class Resque::Plugins::Fifo::Queue::Manager

Constants

DLM_TTL

Attributes

queue_prefix[RW]

Public Class Methods

enqueue_to(key, klass, *args) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 112
def self.enqueue_to(key, klass, *args)
  enqueue_topic('fifo', key, klass, *args)
end
enqueue_topic(topic, key, klass, *args) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 116
def self.enqueue_topic(topic, key, klass, *args)
  # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
  before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
    klass.send(hook, *args)
  end

  return nil if before_hooks.any? { |result| result == false }

  manager = Resque::Plugins::Fifo::Queue::Manager.new(topic)
  manager.enqueue(key, klass, *args)

  Plugin.after_enqueue_hooks(klass).each do |hook|
    klass.send(hook, *args)
  end

  return true
end
new(queue_prefix = 'fifo') click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 11
def initialize(queue_prefix = 'fifo')
  @queue_prefix = queue_prefix
end

Public Instance Methods

all_stats() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 103
def all_stats
  {
    dht_times_rehashed: dht_times_rehashed,
    avg_delay: get_stats_avg_delay,
    avg_dht_recalc: get_stats_avg_dht_recalc,
    max_delay: get_stats_max_delay
  }
end
clear_stats() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 64
def clear_stats
  redis_client.del "fifo-stats-max-delay"
  redis_client.del "fifo-stats-accumulated-delay"
  redis_client.del "fifo-stats-accumulated-count"
  redis_client.del "fifo-stats-dht-rehash"
  redis_client.del "fifo-stats-accumulated-recalc-time"
  redis_client.del "fifo-stats-accumulated-recalc-count"

  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    redis_client.del "queue-stats-#{queue}"
  end
end
compute_queue_name(key) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 27
def compute_queue_name(key)
  index = compute_index(key)
  slots = redis_client.lrange fifo_hash_table_name, 0, -1

  return pending_queue_name if slots.empty?

  slots.reverse.each do |slot|
    slice, queue = slot.split('#')
    if index > slice.to_i
      return queue
    end
  end

  _slice, queue_name = slots.last.split('#')

  queue_name
end
dht_times_rehashed() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 99
def dht_times_rehashed
  redis_client.get("fifo-stats-dht-rehash") || 0
end
dump_dht() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 134
def dump_dht
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    [slice.to_i, queue]
  end
end
dump_queue_names() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 158
def dump_queue_names
  dump_dht.collect { |item| item[1] }
end
dump_queues() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 170
def dump_queues
  query_available_queues.collect do |queue|
    [queue, Resque.peek(queue,0,0)]
  end.to_h
end
dump_queues_sorted() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 215
def dump_queues_sorted
  queues = dump_queues
  dht = dump_dht.collect do |item|
    _slice, queue_name = item
    queues[queue_name]
  end
end
dump_queues_with_slices() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 187
def dump_queues_with_slices
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.collect do |slot, index|
    slice, queue = slot.split('#')
    worker = worker_for_queue(queue)

    hostname = '?'
    status = '?'
    pid = '?'
    started = '?'
    heartbeat = '?'

    if worker
      hostname = worker.hostname
      status = worker.paused? ? 'paused' : worker.state.to_s
      pid = worker.pid
      started = worker.started
      heartbeat = worker.heartbeat
    end

    [slice, queue, hostname, pid, status, started, heartbeat, get_processed_count(queue), Resque.peek(queue,0,0).size ]
  end
end
enqueue(key, klass, *args) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 45
def enqueue(key, klass, *args)
  queue = compute_queue_name(key)

  redis_client.incr "queue-stats-#{queue}"
  Resque.validate(klass, queue)
  if Resque.inline? && inline?
    # Instantiating a Resque::Job and calling perform on it so callbacks run
    # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
    Resque::Job.new(:inline, {'class' => klass, 'args' => Resque.decode(Resque.encode(args)), 'fifo_key' => key, 'enqueue_ts' => 0}).perform
  else
    Resque.push(queue, :class => klass.to_s, :args => args, fifo_key: key, :enqueue_ts => Time.now.to_i)
  end
end
fifo_hash_table_name() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 15
def fifo_hash_table_name
  "fifo-queue-lookup-#{@queue_prefix}"
end
get_processed_count(queue) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 211
def get_processed_count(queue)
  redis_client.get("queue-stats-#{queue}") || 0
end
get_stats_avg_delay() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 91
def get_stats_avg_delay
  accumulated_delay = redis_client.get("fifo-stats-accumulated-delay") || 0
  total_items = redis_client.get("fifo-stats-accumulated-count") || 0
  return 0 if total_items == 0

  return accumulated_delay.to_f / total_items.to_f
end
get_stats_avg_dht_recalc() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 83
def get_stats_avg_dht_recalc
  accumulated_delay = redis_client.get("fifo-stats-accumulated-recalc-time") || 0
  total_items = redis_client.get("fifo-stats-accumulated-recalc-count") || 0
  return 0 if total_items == 0

  return accumulated_delay.to_f / total_items.to_f
end
get_stats_max_delay() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 79
def get_stats_max_delay
  redis_client.get("fifo-stats-max-delay") || 0
end
inline?() click to toggle source

method for stubbing in tests

# File lib/resque/plugins/fifo/queue/manager.rb, line 60
def inline?
  Resque.inline?
end
orphaned_queues() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 264
def orphaned_queues
  current_queues = dump_queue_names
  Resque.all_queues.reject do |queue|
    !queue.start_with?(queue_prefix) || current_queues.include?(queue)
  end
end
peek_pending() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 150
def peek_pending
  Resque.peek(pending_queue_name, 0, 0)
end
pending_queue_name() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 23
def pending_queue_name
  "#{queue_prefix}-pending"
end
pending_total() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 154
def pending_total
  redis_client.llen "queue:#{pending_queue_name}"
end
pretty_dump() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 142
def pretty_dump
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    puts "Slice  ##{slice} -> #{queue}"
  end
end
pretty_dump_queues() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 176
def pretty_dump_queues
  slots = redis_client.lrange fifo_hash_table_name, 0, -1
  slots.each_with_index.collect do |slot, index|
    slice, queue = slot.split('#')
    puts "#Slice #{slice}"

    puts "#{Resque.peek(queue,0,0).to_s.gsub('},',"},\n")},"
    puts "\n"
  end
end
request_refresh() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 252
def request_refresh
  if Resque.inline?
    # Instantiating a Resque::Job and calling perform on it so callbacks run
    # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
    Resque::Job.new(:inline, {'class' => Resque::Plugins::Fifo::Queue::DrainWorker, 'args' => []}).perform
  else
    redis_client.set "fifo_update_timestamp-#{queue_prefix}", Time.now.to_s
    Resque.push(:fifo_refresh, :class => Resque::Plugins::Fifo::Queue::DrainWorker.to_s, :args => [])
  end

end
update_workers() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 223
def update_workers
  # query removed workers
  start_time = Time.now.to_i
  redlock.lock("fifo_queue_lock-#{queue_prefix}", DLM_TTL) do |locked|
    if locked
      start_timestamp = redis_client.get "fifo_update_timestamp-#{queue_prefix}"

      process_dht
      cleanup_queues
      log("reinserting items from pending")
      reinsert_pending_items(pending_queue_name)

      # check if something tried to request an update, if so we requie again
      current_timestamp = redis_client.get "fifo_update_timestamp-#{queue_prefix}"

      if start_timestamp != current_timestamp
        request_refresh
      end
    else
      log("unable to lock DHT.")
    end
  end

  end_time = Time.now.to_i

  redis_client.set("fifo-stats-accumulated-recalc-time", end_time - start_time)
  redis_client.incr "fifo-stats-accumulated-recalc-count"
end
worker_for_queue(queue_name) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 162
def worker_for_queue(queue_name)
  Resque.workers.collect do |worker|
    w_queue_name = worker.queues.select { |name| name.start_with?("#{queue_prefix}-") }.first
    return worker if w_queue_name == queue_name
  end.compact
  nil
end

Private Instance Methods

cleanup_queues() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 273
def cleanup_queues
  current_queues = dump_queue_names
  Resque.all_queues.each do |queue|
    if queue.start_with?(queue_prefix)
      next if current_queues.include?(queue)

      if redis_client.llen("queue:#{queue}") > 0
        log("transfer non empty orphaned queue items to pending")
        transfer_queues(queue, pending_queue_name)
      end

      log("remove orphaned queue #{queue}.")
      Resque.remove_queue(queue)
    end
  end
end
compute_index(key) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 414
def compute_index(key)
  XXhash.xxh32(key)
end
generate_new_slice() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 331
def generate_new_slice
  XXhash.xxh32(rand(0..2**32).to_s)
end
insert_queue_to_slice(slice, queue) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 335
def insert_queue_to_slice(slice, queue)
  queue_str = "#{slice}##{queue}"
  log "insert #{queue} -> #{slice}"
  slots = redis_client.lrange(fifo_hash_table_name, 0, -1)

  if slots.empty?
    redis_client.rpush(fifo_hash_table_name, queue_str)
    return
  end

  _b_slice, prev_queue = slots.last.split('#')
  slots.each do |slot|
    slot_slice, s_queue = slot.split('#')
    if slice < slot_slice.to_i
      redlock.lock!("queue_lock-#{prev_queue}", DLM_TTL) do |_lock_info|
        pause_queues([prev_queue]) do
          redis_client.linsert(fifo_hash_table_name, 'BEFORE', slot, queue_str)
          transfer_queues(prev_queue, pending_queue_name)
        end
      end
      return
    end

    prev_queue = s_queue
  end

  _slot_slice, s_queue = slots.last.split('#')
  pause_queues([s_queue]) do
    transfer_queues(s_queue, pending_queue_name)
    redis_client.rpush(fifo_hash_table_name, queue_str)
  end
end
insert_slot(queue) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 326
def insert_slot(queue)
  new_slice =  generate_new_slice # generate random 32-bit integer
  insert_queue_to_slice new_slice, queue
end
log(message) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 322
def log(message)
  puts message
end
pause_queues(queue_names = [], &block) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 378
def pause_queues(queue_names = [], &block)
  begin
    queue_names.each do |queue_name|
      worker = worker_for_queue(queue_name)
      worker.pause_processing if worker
    end

    block.()
  ensure
    queue_names.each do |queue_name|
      worker = worker_for_queue(queue_name)
      worker.unpause_processing if worker
    end
  end
end
process_dht() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 290
def process_dht
  slots = redis_client.lrange fifo_hash_table_name, 0, -1

  current_queues = slots.map { |slot| slot.split('#')[1] }.uniq

  available_queues = query_available_queues
  # no change don't update
  return if available_queues.sort == current_queues.sort

  redis_client.incr "fifo-stats-dht-rehash"

  remove_list = slots.select do |slot|
    _slice, queue = slot.split('#')
    !available_queues.include?(queue)
  end

  remove_list.each do |slot|
    _slice, queue = slot.split('#')
    log "queue #{queue} removed."
    redis_client.lrem fifo_hash_table_name, -1, slot
    transfer_queues(queue, pending_queue_name)
    redis_client.del "queue-stats-#{queue}"
  end

  added_queues = available_queues.each do |queue|
    if !current_queues.include?(queue)
      insert_slot(queue)
      log "queue #{queue} was added."
    end
  end
end
query_available_queues() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 418
def query_available_queues
  expired_workers = Resque::Worker.all_workers_with_expired_heartbeats

  Resque.workers.reject { |w| expired_workers.include?(w) }.collect do |worker|
    worker.queues.select { |name| name.start_with?("#{queue_prefix}-") }.first
  end.compact
end
redis_client() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 401
def redis_client
  Resque.redis
end
redlock() click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 405
def redlock
    Redlock::Client.new [redis_client.redis], {
    retry_count:   30,
    retry_delay:   1000, # milliseconds
    retry_jitter:  100,  # milliseconds
    redis_timeout: 1  # seconds
  }
end
reinsert_pending_items(from_queue) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 368
def reinsert_pending_items(from_queue)
  redis_client.llen("queue:#{from_queue}").times do
    slot = redis_client.lpop "queue:#{from_queue}"
    queue_json = JSON.parse(slot)
    target_queue = compute_queue_name(queue_json['fifo_key'])
    log "#{queue_json['fifo_key']}: #{from_queue} -> #{target_queue}"
    redis_client.rpush("queue:#{target_queue}", slot)
  end
end
transfer_queues(from_queue, to_queue) click to toggle source
# File lib/resque/plugins/fifo/queue/manager.rb, line 394
def transfer_queues(from_queue, to_queue)
  log "transfer: #{from_queue} -> #{to_queue}"
  redis_client.llen("queue:#{from_queue}").times do
    redis_client.rpoplpush("queue:#{from_queue}", "queue:#{to_queue}")
  end
end