class Fluent::Plugin::WatchObjectspaceInput

Public Instance Methods

check_threshold(record) click to toggle source
# File lib/fluent/plugin/in_watch_objectspace.rb, line 132
def check_threshold(record)
  return unless @threshold

  if @threshold.res_of_top
    if @source["res"] * @threshold.res_of_top < record["res"]
      record["memory_leaks"] = true
      message = sprintf("Memory usage is over than expected, threshold res_of_top rate <%f>: %f > %f * %f",
                        @threshold.res_of_top, record["res"],
                        @source["res"], @threshold.res_of_top)
      raise message
    end
  end
  if @threshold.memsize_of_all
    if @source["memsize_of_all"] * @threshold.memsize_of_all < record["memsize_of_all"]
      record["memory_leaks"] = true
      message = sprintf("Memory usage is over than expected, threshold of memsize_of_all rate <%f>: %f > %f * %f",
                        @threshold.memsize_of_all, record["memsize_of_all"],
                        @source["memsize_of_all"], @threshold.memsize_of_all)
      raise message
    end
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_watch_objectspace.rb, line 49
def configure(conf)
  super(conf)
  if @modules
    @modules.each do |mod|
      begin
        require mod
      rescue LoadError
        raise Fluent::ConfigError.new("BUG: module <#{mod}> can't be loaded")
      end
    end
  end
  if File.readlines("/etc/os-release").any? { |line| line.include?("ID=alpine\n") }
    # alpine's top doesn't support -p option because it uses Busybox
    # ps -q is also not supported. No better way to support it by default.
    raise RuntimeError, "BUG: alpine is not supported"
  end
  @warmup_time = Time.now + @watch_delay
  @source = {}
  GC::Profiler.enable
end
parse_top_result(content) click to toggle source
# File lib/fluent/plugin/in_watch_objectspace.rb, line 75
def parse_top_result(content)
  record = {}
  fields = content.split("\n")[-2].split
  values = content.split("\n").last.split
  fields.each_with_index do |field, index|
    next unless @top_fields.include?(field)
    case field
    when "USER", "S", "TIME+", "COMMAND"
      record[field.downcase] = values[index]
    when "PID", "PR", "NI", "VIRT", "RES", "SHR"
      record[field.downcase] = values[index].to_i
    else
      record[field.downcase] = values[index].to_f
    end
  end
  record
end
refresh_watchers() click to toggle source
# File lib/fluent/plugin/in_watch_objectspace.rb, line 93
def refresh_watchers
  return if Time.now < @warmup_time
  
  pid = Process.pid
  record = {
    "pid" => pid,
    "count" => {},
    "memory_leaks" => false,
    "memsize_of_all" => ObjectSpace.memsize_of_all
  }

  begin
    content = IO.popen("top -p #{pid} -b -n 1") do |io|
      io.read
    end
    record.merge!(parse_top_result(content))

    if @gc_raw_data
      record["gc_raw_data"] = GC::Profiler.raw_data
    end
    if @watch_class
      @watch_class.each do |klass|
        record["count"]["#{klass.downcase}"] = ObjectSpace.each_object(Object.const_get(klass)) { |x| x }
      end
    end

    if @source.empty?
      record["memsize_of_all"] = ObjectSpace.memsize_of_all
      @source = record
    end

    check_threshold(record)
    es = OneEventStream.new(Fluent::EventTime.now, record)
    router.emit_stream(@tag, es)
  rescue => e
    $log.error(e.message)
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_watch_objectspace.rb, line 155
def shutdown
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_watch_objectspace.rb, line 70
def start
  super
  timer_execute(:execute_watch_objectspace, @watch_interval, &method(:refresh_watchers))
end