class Fluent::Plugin::DstatInput

Public Class Methods

desc(description) click to toggle source
# File lib/fluent/plugin/in_dstat.rb, line 24
def desc(description)
end
new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dstat.rb, line 10
def initialize
  super

  require 'csv'
  @line_number = 0
  @first_keys = []
  @second_keys = []
  @data_array = []
  @last_time = Time.now
end

Public Instance Methods

check_dstat() click to toggle source
# File lib/fluent/plugin/in_dstat.rb, line 59
def check_dstat
  now = Time.now
  if now - @last_time > @delay * 3
    log.info "Process dstat(#{@pid}) is stopped. Last updated: #{@last_time}"
    restart
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dstat.rb, line 46
def configure(conf)
  super

  @command = "#{@dstat_path} #{@option} --nocolor --output #{@tmp_file} #{@delay} > /dev/null 2>&1"
  @hostname = `#{@hostname_command}`.chomp!

  begin
    `#{@dstat_path} --version`
  rescue Errno::ENOENT
    raise Fluent::ConfigError, "'#{@dstat_path}' command not found. Install dstat before run fluentd"
  end
end
receive_lines(lines) click to toggle source
# File lib/fluent/plugin/in_dstat.rb, line 105
def receive_lines(lines)
  lines.each do |line|
    next if line == ""
    case @line_number
    when 0..1
    when 2
      line.delete!("\"")
      @first_keys = CSV.parse_line(line)
      pre_key = ""
      @first_keys.each_with_index do |key, index|
        if key.nil? || key == ""
          @first_keys[index] = pre_key
        else
          @first_keys[index] = @first_keys[index].gsub(/\s/, '_')
        end
        pre_key = @first_keys[index]
      end
    when 3
      line.delete!("\"")
      @second_keys = line.split(',')
      @first_keys.each_with_index do |key, index|
        @data_array[index] = {}
        @data_array[index][:first] = key
        @data_array[index][:second] = @second_keys[index]
      end
    else
      values = line.split(',')
      data = Hash.new { |hash,key| hash[key] = Hash.new {} }
      values.each_with_index do |v, index|
        data[@first_keys[index]][@second_keys[index]] = v
      end
      record = {
        'hostname' => @hostname,
        'dstat' => data
      }
      router.emit(@tag, Fluent::Engine.now, record)
    end
    @line_number += 1
    @last_time = Time.now
  end

end
restart() click to toggle source
# File lib/fluent/plugin/in_dstat.rb, line 86
def restart
  Process.detach(@pid)
  begin
    Process.kill(:TERM, @pid)
  rescue Errno::ESRCH => e
    log.error "unexpected death of a child process", :error=>e.to_s
    log.error_backtrace
  end
  @dw.detach
  @tw.detach
  @line_number = 0

  @io = IO.popen(@command, "r")
  @pid = @io.pid
  @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines))
  event_loop_attach(@dw)
  @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat))
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dstat.rb, line 78
def shutdown
  Process.kill(:TERM, @pid)
  @dw.detach
  @tw.detach
  File.delete(@tmp_file)
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dstat.rb, line 67
def start
  super
  system("mkfifo #{@tmp_file}")
  @io = IO.popen(@command, "r")
  @pid = @io.pid

  @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines))
  event_loop_attach(@dw)
  @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat))
end