class MetricsCapacitor::Engine

Public Class Methods

new() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 14
def initialize
  $0 = 'metrics-capacitor (engine)'
  Config.load!
  @exit_flag = false
  @pids = []
  @pids_kiq = []
  %w(TERM INT).each do |sig|
    Signal.trap(sig) do
      @pids_kiq.each { |pid| Process.kill('INT', pid) rescue true }
      @pids.each { |pid| Process.kill(sig, pid) rescue true }
      Process.waitall
      terminate_loggers
    end
  end
  init_logger
  log :info, "Engine warmed-up :-)"
end

Public Instance Methods

fork_processor(args = {}) click to toggle source
# File lib/metrics-capacitor/engine.rb, line 32
def fork_processor(args = {})
  log :debug, "Spawning #{args[:name]}"
  args[:proc_num] ||= 1
  args[:exit_on] ||= %w{INT TERM}
  args[:proc_num].times do |num|
    @logpipe["#{args[:name]}_#{num}".to_sym], logpipe = IO.pipe
    @pids << Process.fork do
      $0 = "metrics-capacitor (#{args[:name]})" if args[:name]
      @logpipe["#{args[:name]}_#{num}".to_sym].close
      remove_instance_variable(:@logpipe)
      p = Kernel.const_get("MetricsCapacitor::Processor::#{args[:name].capitalize}").new(logpipe)
      args[:exit_on].each { |sig| Signal.trap(sig) { p.shutdown! } }
      p.start!
    end
    log :debug, "#{args[:name].capitalize} spawned as PID #{@pids.last.to_s}"
    logpipe.close
  end
end
fork_scrubber() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 51
def fork_scrubber
  log :debug, "Spawning scrubbers"
  Config.scrubber[:processes].times do |num|
    @logpipe["scrubber_#{num}".to_sym], logpipe = IO.pipe
    @pids_kiq << Process.fork do
      @logpipe["scrubber_#{num}".to_sym].close
      remove_instance_variable(:@logpipe)
      require_relative 'sidekiq'
      Sidekiq.configure_server do |config|
        Sidekiq::Logging.logger = ::Logger.new(logpipe)
        Sidekiq::Logging.logger.level = log_level
        Sidekiq::Logging.logger.progname = "scrubber"
        Sidekiq::Logging.logger.formatter = proc { |severity, _, progname, msg| "#{progname}##{Process.pid}|||#{severity}|||#{msg}\n" }
        config.redis = { url: Config.redis[:url] }
      end
      Sidekiq.configure_client do |config|
        config.redis = { url: Config.redis[:url] }
      end
      $TESTING = 0
      kiq = Sidekiq::CLI.instance
      kiq.parse(['-c', Config.scrubber[:threads].to_s, '-r', File.expand_path('..', __FILE__)+'/processor/scrubber.rb', '-q', 'scrubber'])
      kiq.run
    end
    logpipe.close
  end
end
init_logger() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 89
def init_logger
  @logpipe = {}
  @logger = ::Logger.new(STDOUT)
  @logger.level = log_level
  @logger.formatter = proc { |severity, datetime, progname, msg| [datetime.to_s, progname, severity, "#{msg}\n"].join(" ") }
  @logger_threads = []
  @logger_semaphore = Mutex.new
end
log(severity = :info, msg) click to toggle source
# File lib/metrics-capacitor/engine.rb, line 78
def log(severity = :info, msg)
  s = Kernel.const_get("Logger::#{severity.to_s.upcase}")
  @logger_semaphore.synchronize do
    @logger.log s, msg, 'engine#' + Process.pid.to_s
  end
end
log_level() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 85
def log_level
  Config.debug ? ::Logger::DEBUG : ::Logger::INFO
end
run!() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 117
def run!
  log :info, 'Engine is starting up'
  fork_scrubber
  fork_processor name: 'writer', proc_num: Config.writer[:processes]
  fork_processor name: 'aggregator'
  fork_processor name: 'listener'
  spawn_logger_threads
  log :info, 'Engine has started :-)'
  # TODO: unix socket for control and status reporting ;)
  begin
    ::Process.waitall
    log :warn, 'Terminating!'
  rescue Interrupt
    retry
  end
  log :info, 'Terminating loggers'
  terminate_loggers
  log :warn, 'Engine is shutting down!'
end
spawn_logger_threads() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 98
def spawn_logger_threads
  @logpipe.each do |name, pipe|
    @logger_threads << Thread.new do
      Thread.current[:name] = "logger-#{name}"
      while ! pipe.eof?
        msg = pipe.gets
        (progname,severity,message) = msg.split('|||')
        @logger_semaphore.synchronize do
          @logger.log Kernel.const_get("Logger::#{severity}"), message.chomp, progname
        end
      end
    end
  end
end
terminate_loggers() click to toggle source
# File lib/metrics-capacitor/engine.rb, line 113
def terminate_loggers
  @logger_threads.each(&:join)
end