module Sidekiq::LimitFetch::Global::Monitor
Constants
- HEARTBEAT_PREFIX
- HEARTBEAT_TTL
- PROCESS_SET
- REFRESH_TIMEOUT
Public Instance Methods
all_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 28 def all_processes Sidekiq.redis { |it| it.smembers PROCESS_SET } end
handle_dynamic_queues()
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 44 def handle_dynamic_queues queues = Sidekiq::LimitFetch::Queues return unless queues.dynamic? available_queues = Sidekiq::Queue.all.map(&:name).reject do |it| queues.dynamic_exclude.include? it end queues.handle available_queues end
old_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 32 def old_processes all_processes.reject do |process| Sidekiq.redis { |it| it.get heartbeat_key process } == '1' end end
remove_old_processes!()
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 38 def remove_old_processes! Sidekiq.redis do |it| old_processes.each { |process| it.srem PROCESS_SET, [process] } end end
start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT)
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 14 def start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) Thread.new do loop do Sidekiq::LimitFetch.redis_retryable do handle_dynamic_queues update_heartbeat ttl invalidate_old_processes end sleep timeout end end end
Private Instance Methods
heartbeat_key(process = Selector.uuid)
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 77 def heartbeat_key(process = Selector.uuid) HEARTBEAT_PREFIX + process end
invalidate_old_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 66 def invalidate_old_processes Sidekiq.redis do |_it| remove_old_processes! processes = all_processes Sidekiq::Queue.instances.each do |queue| queue.remove_locks_except! processes end end end
update_heartbeat(ttl)
click to toggle source
# File lib/sidekiq/limit_fetch/global/monitor.rb, line 56 def update_heartbeat(ttl) Sidekiq.redis do |it| it.multi do |pipeline| pipeline.set heartbeat_key, '1' pipeline.sadd PROCESS_SET, [Selector.uuid] pipeline.expire heartbeat_key, ttl end end end