class Capacitor::Watcher
Public Class Methods
run()
click to toggle source
# File lib/capacitor/watcher.rb, line 60 def self.run watcher = new %w(INT TERM).each do |signal| trap signal do watcher.handle_signal(signal) end end watcher.loop_forever end
Public Instance Methods
commands_fetcher()
click to toggle source
# File lib/capacitor/watcher.rb, line 56 def commands_fetcher @commands_fetcher ||= CommandsFetcher.new end
handle_signal(signal)
click to toggle source
# File lib/capacitor/watcher.rb, line 96 def handle_signal(signal) case signal when 'INT', 'TERM' working? ? shut_down : shut_down! end end
log_level()
click to toggle source
# File lib/capacitor/watcher.rb, line 52 def log_level redis.get "log_level" end
logger()
click to toggle source
# File lib/capacitor/watcher.rb, line 72 def logger Capacitor.logger end
loop_forever()
click to toggle source
# File lib/capacitor/watcher.rb, line 7 def loop_forever logger.info "Capacitor listening..." redis.set "capacitor_start", Time.new.to_s loop do wait_for_batch loop_once end end
loop_once()
click to toggle source
# File lib/capacitor/watcher.rb, line 21 def loop_once @working = true start_time = Time.new Capacitor.log_level= log_level counts = commands_fetcher.retrieve_batch with_pause process_batch counts commands_fetcher.flush_batch instrument "capacitor.loop.time", Time.new - start_time, units:'seconds' instrument "capacitor.loop.object_counters", counts.length shut_down! if shut_down? ensure @working = false end
pause_time()
click to toggle source
# File lib/capacitor/watcher.rb, line 47 def pause_time time = redis.get "pause_time" time ? time.to_f : nil end
process(counter_id, count_delta)
click to toggle source
Public: Updates a counter on one model
# File lib/capacitor/watcher.rb, line 88 def process(counter_id, count_delta) updater = Updater.new(counter_id, count_delta) logger.debug updater.inspect if logger.debug? updater.update rescue Exception => e logger.error "#{counter_id} exception: #{e}" end
process_batch(counts)
click to toggle source
Public: update_counters on models
counts - {'classname:id:field_name' => increment, …}
Returns: nothing
# File lib/capacitor/watcher.rb, line 81 def process_batch(counts) counts.each do |counter_id, count_delta| process(counter_id, count_delta) end end
wait_for_batch()
click to toggle source
# File lib/capacitor/watcher.rb, line 17 def wait_for_batch commands_fetcher.block_on_incoming_signal_list end
with_pause() { || ... }
click to toggle source
# File lib/capacitor/watcher.rb, line 38 def with_pause yield if block_given? if time = pause_time logger.debug "Capacitor pausing for #{time}s" sleep time end end
Private Instance Methods
delay_warning_threshold()
click to toggle source
# File lib/capacitor/watcher.rb, line 121 def delay_warning_threshold 0.003 end
instrument(*args, &block)
click to toggle source
# File lib/capacitor/watcher.rb, line 129 def instrument(*args, &block) Metrics.instrument *args, &block end
redis()
click to toggle source
# File lib/capacitor/watcher.rb, line 125 def redis Capacitor.redis end
shut_down()
click to toggle source
# File lib/capacitor/watcher.rb, line 109 def shut_down @shut_down = true end
shut_down!()
click to toggle source
# File lib/capacitor/watcher.rb, line 113 def shut_down! exit(0) end
shut_down?()
click to toggle source
# File lib/capacitor/watcher.rb, line 117 def shut_down? @shut_down end
working?()
click to toggle source
# File lib/capacitor/watcher.rb, line 105 def working? @working end