module Resque::StuckQueue
Constants
- HEARTBEAT_INTERVAL
defaults
- HEARTBEAT_KEY
- LOGGER
- RECOVERED_HANDLER
- TRIGGERED_HANDLER
must be called by convention: type_handler
- TRIGGERED_KEY
- TRIGGER_TIMEOUT
- VERSION
- WATCHER_INTERVAL
Attributes
config[RW]
Public Class Methods
abort_on_exception()
click to toggle source
# File lib/resque_stuck_queue.rb, line 56 def abort_on_exception if !config[:abort_on_exception].nil? config[:abort_on_exception] # allow overriding w false else true # default end end
force_stop!()
click to toggle source
# File lib/resque_stuck_queue.rb, line 108 def force_stop! logger.info("Force stopping") @threads.map(&:kill) reset! end
heartbeat_key_for(queue)
click to toggle source
# File lib/resque_stuck_queue.rb, line 32 def heartbeat_key_for(queue) if config[:heartbeat_key] "#{queue}:#{config[:heartbeat_key]}" else "#{queue}:#{HEARTBEAT_KEY}" end end
heartbeat_keys()
click to toggle source
# File lib/resque_stuck_queue.rb, line 48 def heartbeat_keys queues.map{|q| heartbeat_key_for(q) } end
log_starting_info()
click to toggle source
# File lib/resque_stuck_queue.rb, line 145 def log_starting_info logger.info("Starting StuckQueue with config: #{self.config.inspect}") end
log_watcher_info(queue_name)
click to toggle source
# File lib/resque_stuck_queue.rb, line 149 def log_watcher_info(queue_name) logger.info("Lag time for #{queue_name} is #{lag_time(queue_name).inspect} seconds.") if triggered_ago = last_triggered(queue_name) logger.info("Last triggered for #{queue_name} is #{triggered_ago.inspect} seconds.") else logger.info("No last trigger found for #{queue_name}.") end end
logger()
click to toggle source
# File lib/resque_stuck_queue.rb, line 24 def logger @logger ||= (config[:logger] || StuckQueue::LOGGER) end
queues()
click to toggle source
# File lib/resque_stuck_queue.rb, line 52 def queues @queues ||= (config[:queues] || [:app]) end
redis()
click to toggle source
# File lib/resque_stuck_queue.rb, line 28 def redis @redis ||= config[:redis] end
reset!()
click to toggle source
# File lib/resque_stuck_queue.rb, line 114 def reset! # clean state so we can stop and start in the same process. @config = Config.new # clear, unfreeze @queues = nil @running = false @logger = nil end
reset_keys()
click to toggle source
# File lib/resque_stuck_queue.rb, line 122 def reset_keys queues.each do |qn| redis.del(heartbeat_key_for(qn)) redis.del(triggered_key_for(qn)) end end
start()
click to toggle source
call this after setting config. once started you should't be allowed to modify it
# File lib/resque_stuck_queue.rb, line 72 def start @running = true @stopped = false @threads = [] config.validate_required_keys! config.freeze log_starting_info reset_keys RedisClassy.redis = redis if RedisClassy.redis.nil? pretty_process_name setup_heartbeat_thread setup_watcher_thread setup_warn_thread # fo-eva. @threads.map(&:join) logger.info("threads stopped") @stopped = true end
start_in_background()
click to toggle source
# File lib/resque_stuck_queue.rb, line 64 def start_in_background Thread.new do Thread.current.abort_on_exception = abort_on_exception self.start end end
stop()
click to toggle source
# File lib/resque_stuck_queue.rb, line 99 def stop reset! # wait for clean thread shutdown while @stopped == false sleep 1 end logger.info("Stopped") end
stopped?()
click to toggle source
# File lib/resque_stuck_queue.rb, line 129 def stopped? @stopped end
trigger_handler(queue_name, type)
click to toggle source
# File lib/resque_stuck_queue.rb, line 133 def trigger_handler(queue_name, type) raise 'Must trigger either the recovered or triggered handler!' unless (type == :recovered || type == :triggered) handler_name = :"#{type}_handler" logger.info("Triggering #{type} handler for #{queue_name} at #{Time.now}.") (config[handler_name] || const_get(handler_name.upcase)).call(queue_name, lag_time(queue_name)) manual_refresh(queue_name, type) rescue => e logger.info("handler #{type} for #{queue_name} crashed: #{e.inspect}") logger.info("\n#{e.backtrace.join("\n")}") raise e end
triggered_key_for(queue)
click to toggle source
# File lib/resque_stuck_queue.rb, line 40 def triggered_key_for(queue) if config[:triggered_key] "#{queue}:#{self.config[:triggered_key]}" else "#{queue}:#{TRIGGERED_KEY}" end end
Private Class Methods
enqueue_jobs()
click to toggle source
# File lib/resque_stuck_queue.rb, line 224 def enqueue_jobs if config[:heartbeat_job] # FIXME config[:heartbeat_job] with mutliple queues is bad semantics config[:heartbeat_job].call else queues.each do |queue_name| # Redis::Namespace.new support as well as Redis.new namespace = redis.respond_to?(:namespace) ? redis.namespace : nil Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i ) end end end
lag_time(queue_name)
click to toggle source
# File lib/resque_stuck_queue.rb, line 262 def lag_time(queue_name) Time.now.to_i - last_successful_heartbeat(queue_name) end
last_successful_heartbeat(queue_name)
click to toggle source
# File lib/resque_stuck_queue.rb, line 237 def last_successful_heartbeat(queue_name) time_set = read_from_redis(heartbeat_key_for(queue_name)) if time_set time_set else logger.info("manually refreshing #{queue_name} for :first_time") manual_refresh(queue_name, :first_time) end.to_i end
last_triggered(queue_name)
click to toggle source
# File lib/resque_stuck_queue.rb, line 266 def last_triggered(queue_name) time_set = read_from_redis(triggered_key_for(queue_name)) if !time_set.nil? Time.now.to_i - time_set.to_i end end
log_starting_thread(type)
click to toggle source
# File lib/resque_stuck_queue.rb, line 160 def log_starting_thread(type) interval_keyname = "#{type}_interval".to_sym logger.info("Starting #{type} thread with interval of #{config[interval_keyname]} seconds") end
manual_refresh(queue_name, type)
click to toggle source
# File lib/resque_stuck_queue.rb, line 247 def manual_refresh(queue_name, type) if type == :triggered time = Time.now.to_i redis.set(triggered_key_for(queue_name), time) time elsif type == :recovered redis.del(triggered_key_for(queue_name)) nil elsif type == :first_time time = Time.now.to_i redis.set(heartbeat_key_for(queue_name), time) time end end
max_wait_time()
click to toggle source
# File lib/resque_stuck_queue.rb, line 309 def max_wait_time config[:trigger_timeout] || TRIGGER_TIMEOUT end
pretty_process_name()
click to toggle source
# File lib/resque_stuck_queue.rb, line 313 def pretty_process_name $0 = "rake --trace resque:stuck_queue #{redis.inspect} QUEUES=#{queues.join(",")}" end
read_from_redis(keyname)
click to toggle source
# File lib/resque_stuck_queue.rb, line 165 def read_from_redis(keyname) redis.get(keyname) end
setup_heartbeat_thread()
click to toggle source
# File lib/resque_stuck_queue.rb, line 194 def setup_heartbeat_thread @threads << Thread.new do Thread.current.abort_on_exception = abort_on_exception log_starting_thread(:heartbeat) while @running # we want to go through resque jobs, because that's what we're trying to test here: # ensure that jobs get executed and the time is updated! wait_for_it(:heartbeat_interval) logger.info("Sending heartbeat jobs") enqueue_jobs end end end
setup_warn_thread()
click to toggle source
# File lib/resque_stuck_queue.rb, line 208 def setup_warn_thread if config[:warn_interval] @threads << Thread.new do Thread.current.abort_on_exception = abort_on_exception log_starting_thread(:warn) while @running queues.each do |qn| trigger_handler(qn, :triggered) if should_trigger?(qn, true) end wait_for_it(:warn_interval) end end end end
setup_watcher_thread()
click to toggle source
# File lib/resque_stuck_queue.rb, line 169 def setup_watcher_thread @threads << Thread.new do Thread.current.abort_on_exception = abort_on_exception log_starting_thread(:watcher) while @running mutex = RedisMutex.new(:resque_stuck_queue, :block => 0) if mutex.lock begin queues.each do |queue_name| log_watcher_info(queue_name) if should_trigger?(queue_name) trigger_handler(queue_name, :triggered) elsif should_recover?(queue_name) trigger_handler(queue_name, :recovered) end end ensure mutex.unlock end end wait_for_it(:watcher_interval) end end end
should_recover?(queue_name)
click to toggle source
# File lib/resque_stuck_queue.rb, line 273 def should_recover?(queue_name) last_triggered(queue_name) && lag_time(queue_name) < max_wait_time end
should_trigger?(queue_name, force_trigger = false)
click to toggle source
# File lib/resque_stuck_queue.rb, line 278 def should_trigger?(queue_name, force_trigger = false) if lag_time(queue_name) >= max_wait_time last_trigger = last_triggered(queue_name) if force_trigger return true end if last_trigger.nil? # if it hasn't been triggered before, do it return true end # if it already triggered in the past don't trigger again. # :recovered should clearn out last_triggered so the cycle (trigger<->recover) continues return false end end
wait_for_it(type)
click to toggle source
# File lib/resque_stuck_queue.rb, line 297 def wait_for_it(type) if type == :heartbeat_interval sleep config[:heartbeat_interval] || HEARTBEAT_INTERVAL elsif type == :watcher_interval sleep config[:watcher_interval] || WATCHER_INTERVAL elsif type == :warn_interval sleep config[:warn_interval] else raise 'Must sleep for :watcher_interval interval or :heartbeat_interval or :warn_interval interval!' end end