class MetricsCapacitor::Engine
Public Class Methods
new()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 14 def initialize $0 = 'metrics-capacitor (engine)' Config.load! @exit_flag = false @pids = [] @pids_kiq = [] %w(TERM INT).each do |sig| Signal.trap(sig) do @pids_kiq.each { |pid| Process.kill('INT', pid) rescue true } @pids.each { |pid| Process.kill(sig, pid) rescue true } Process.waitall terminate_loggers end end init_logger log :info, "Engine warmed-up :-)" end
Public Instance Methods
fork_processor(args = {})
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 32 def fork_processor(args = {}) log :debug, "Spawning #{args[:name]}" args[:proc_num] ||= 1 args[:exit_on] ||= %w{INT TERM} args[:proc_num].times do |num| @logpipe["#{args[:name]}_#{num}".to_sym], logpipe = IO.pipe @pids << Process.fork do $0 = "metrics-capacitor (#{args[:name]})" if args[:name] @logpipe["#{args[:name]}_#{num}".to_sym].close remove_instance_variable(:@logpipe) p = Kernel.const_get("MetricsCapacitor::Processor::#{args[:name].capitalize}").new(logpipe) args[:exit_on].each { |sig| Signal.trap(sig) { p.shutdown! } } p.start! end log :debug, "#{args[:name].capitalize} spawned as PID #{@pids.last.to_s}" logpipe.close end end
fork_scrubber()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 51 def fork_scrubber log :debug, "Spawning scrubbers" Config.scrubber[:processes].times do |num| @logpipe["scrubber_#{num}".to_sym], logpipe = IO.pipe @pids_kiq << Process.fork do @logpipe["scrubber_#{num}".to_sym].close remove_instance_variable(:@logpipe) require_relative 'sidekiq' Sidekiq.configure_server do |config| Sidekiq::Logging.logger = ::Logger.new(logpipe) Sidekiq::Logging.logger.level = log_level Sidekiq::Logging.logger.progname = "scrubber" Sidekiq::Logging.logger.formatter = proc { |severity, _, progname, msg| "#{progname}##{Process.pid}|||#{severity}|||#{msg}\n" } config.redis = { url: Config.redis[:url] } end Sidekiq.configure_client do |config| config.redis = { url: Config.redis[:url] } end $TESTING = 0 kiq = Sidekiq::CLI.instance kiq.parse(['-c', Config.scrubber[:threads].to_s, '-r', File.expand_path('..', __FILE__)+'/processor/scrubber.rb', '-q', 'scrubber']) kiq.run end logpipe.close end end
init_logger()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 89 def init_logger @logpipe = {} @logger = ::Logger.new(STDOUT) @logger.level = log_level @logger.formatter = proc { |severity, datetime, progname, msg| [datetime.to_s, progname, severity, "#{msg}\n"].join(" ") } @logger_threads = [] @logger_semaphore = Mutex.new end
log(severity = :info, msg)
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 78 def log(severity = :info, msg) s = Kernel.const_get("Logger::#{severity.to_s.upcase}") @logger_semaphore.synchronize do @logger.log s, msg, 'engine#' + Process.pid.to_s end end
log_level()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 85 def log_level Config.debug ? ::Logger::DEBUG : ::Logger::INFO end
run!()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 117 def run! log :info, 'Engine is starting up' fork_scrubber fork_processor name: 'writer', proc_num: Config.writer[:processes] fork_processor name: 'aggregator' fork_processor name: 'listener' spawn_logger_threads log :info, 'Engine has started :-)' # TODO: unix socket for control and status reporting ;) begin ::Process.waitall log :warn, 'Terminating!' rescue Interrupt retry end log :info, 'Terminating loggers' terminate_loggers log :warn, 'Engine is shutting down!' end
spawn_logger_threads()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 98 def spawn_logger_threads @logpipe.each do |name, pipe| @logger_threads << Thread.new do Thread.current[:name] = "logger-#{name}" while ! pipe.eof? msg = pipe.gets (progname,severity,message) = msg.split('|||') @logger_semaphore.synchronize do @logger.log Kernel.const_get("Logger::#{severity}"), message.chomp, progname end end end end end
terminate_loggers()
click to toggle source
# File lib/metrics-capacitor/engine.rb, line 113 def terminate_loggers @logger_threads.each(&:join) end