module Sidekiq::LimitFetch::Global::Monitor

Constants

HEARTBEAT_PREFIX
HEARTBEAT_TTL
PROCESS_SET
REFRESH_TIMEOUT

Public Instance Methods

all_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 28
def all_processes
  Sidekiq.redis { |it| it.smembers PROCESS_SET }
end
handle_dynamic_queues() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 44
def handle_dynamic_queues
  queues = Sidekiq::LimitFetch::Queues
  return unless queues.dynamic?

  available_queues = Sidekiq::Queue.all.map(&:name).reject do |it|
    queues.dynamic_exclude.include? it
  end
  queues.handle available_queues
end
old_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 32
def old_processes
  all_processes.reject do |process|
    Sidekiq.redis { |it| it.get heartbeat_key process } == '1'
  end
end
remove_old_processes!() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 38
def remove_old_processes!
  Sidekiq.redis do |it|
    old_processes.each { |process| it.srem PROCESS_SET, [process] }
  end
end
start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 14
def start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT)
  Thread.new do
    loop do
      Sidekiq::LimitFetch.redis_retryable do
        handle_dynamic_queues
        update_heartbeat ttl
        invalidate_old_processes
      end

      sleep timeout
    end
  end
end

Private Instance Methods

heartbeat_key(process = Selector.uuid) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 77
def heartbeat_key(process = Selector.uuid)
  HEARTBEAT_PREFIX + process
end
invalidate_old_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 66
def invalidate_old_processes
  Sidekiq.redis do |_it|
    remove_old_processes!
    processes = all_processes

    Sidekiq::Queue.instances.each do |queue|
      queue.remove_locks_except! processes
    end
  end
end
update_heartbeat(ttl) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 56
def update_heartbeat(ttl)
  Sidekiq.redis do |it|
    it.multi do |pipeline|
      pipeline.set heartbeat_key, '1'
      pipeline.sadd PROCESS_SET, [Selector.uuid]
      pipeline.expire heartbeat_key, ttl
    end
  end
end