module Sidekiq::LimitFetch::Global::Monitor

Constants

HEARTBEAT_PREFIX
HEARTBEAT_TTL
PROCESS_SET
REFRESH_TIMEOUT

Public Instance Methods

add_dynamic_queues() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 40
def add_dynamic_queues
  queues = Sidekiq::LimitFetch::Queues
  queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic?
end
all_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 24
def all_processes
  Sidekiq.redis {|it| it.smembers PROCESS_SET }
end
old_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 28
def old_processes
  all_processes.reject do |process|
    Sidekiq.redis {|it| it.get heartbeat_key process }
  end
end
remove_old_processes!() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 34
def remove_old_processes!
  Sidekiq.redis do |it|
    old_processes.each {|process| it.srem PROCESS_SET, process }
  end
end
start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 10
def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
  Thread.new do
    loop do
      Sidekiq::LimitFetch.redis_retryable do
        add_dynamic_queues
        update_heartbeat ttl
        invalidate_old_processes
      end

      sleep timeout
    end
  end
end

Private Instance Methods

heartbeat_key(process=Selector.uuid) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 68
def heartbeat_key(process=Selector.uuid)
  HEARTBEAT_PREFIX + process
end
invalidate_old_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 57
def invalidate_old_processes
  Sidekiq.redis do |it|
    remove_old_processes!
    processes = all_processes

    Sidekiq::Queue.instances.each do |queue|
      queue.remove_locks_except! processes
    end
  end
end
update_heartbeat(ttl) click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 47
def update_heartbeat(ttl)
  Sidekiq.redis do |it|
    it.multi do
      it.set heartbeat_key, true
      it.sadd PROCESS_SET, Selector.uuid
      it.expire heartbeat_key, ttl
    end
  end
end