class LogStash::Inputs::RemoteProc

Collecting PROCFS metrics through SSH.

Supported endpoints :

* /proc/cpuinfo
* /proc/stat
* /proc/meminfo
* /proc/loadavg
* /proc/vmstat
* /proc/diskstats
* /proc/net/dev
* /proc/net/wireless
* /proc/net/mounts
* /proc/net/crypto
* /proc/sysvipc/shm

The fallowing example shows how to retrieve system metrics from remote server and output the result to the standard output:

source,ruby

input {

remote_proc {
  servers => [
    { host => "remote.server.com" username => "medium" },
    { host => "h2.net" username => "poc" gateway_host => "h.gw.net" gateway_username => "user" }
  ]
  proc_list => ["cpuinfo", "stat", "meminfo", "diskstats"]
}

}


Example with `proc_list => [“_all”]` which is default.

source,ruby

input {

remote_proc {
  servers => [
    { host => "remote.server.com" username => "medium" },
    { host => "h2.net" username => "poc" gateway_host => "h.gw.net" gateway_username => "user" }
  ]
  proc_list => ["_all"]
}

}


Example with specific procfs prefix path and system reader for certain host. By default the system reader is 'cat' and the procfs prefix path is '/proc'.

source,ruby

input {

remote_proc {
  servers => [
    { host => "remote.server.com" username => "medium" system_reader => "dd bs=1 2>/dev/null" proc_prefix_path => "if=/proc"},
    { host => "h2.net" username => "poc" gateway_host => "h.gw.net" gateway_username => "user" }
  ]
  proc_list => ["stat", "meminfo"]
}

}



Example with specific server tags.

source,ruby

input {

remote_proc {
  servers => [
    { host => "remote.server.com" username => "medium" server_tags => "traefik, nginx, etcd"},
    { host => "h2.net" username => "poc" gateway_host => "h.gw.net" server_tags => "redis" }
  ]
  proc_list => ["stat", "meminfo"]
}

}


Constants

COMMANDS

Liste of commands for each `/proc` endpoints.

DEFAULT_PROC_LIST

By default call all procfs method

SERVER_OPTIONS

Describe valid keys and default values in `@servers` parameter.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/remote_proc.rb, line 153
def register
  @host = Socket.gethostname

  require 'net/ssh'
  require 'net/ssh/gateway'

  @ssh_sessions = []
  @ssh_gateways = []

  configure!
end
run(queue) click to toggle source
# File lib/logstash/inputs/remote_proc.rb, line 165
def run(queue)
  # we can abort the loop if stop? becomes true
  until stop?
    @ssh_sessions.each do |ssh|
      ssh.properties['_commands'].each do |method, command|
        ssh.open_channel do |chan|
          chan.exec(command) do |ch, success|
            ch[:result_host] = ssh.properties['host']
            ch[:result_port] = ssh.properties['port']
            ch[:result_server_tags] = ssh.properties['server_tags']
            unless success
              @logger.warn('CHANNEL_EXEC_UNSUCCESS',
                           command: command,
                           host: ch[:result_host],
                           port: ch[:result_port])
              next
            end
            ch[:result_data] = String.new('')
            ch[:result_error] = String.new('')
            # "on_data" called when the process writes to stdout
            ch.on_data { |_c, data| ch[:result_data] << data }
            ch.on_process do |_c|
              unless ch[:result_error].empty?
                ch[:result_error].chomp!
                ch[:result_error] = ch[:result_error].force_encoding('UTF-8')
                @logger.warn(ch[:result_error])
                next
              end
              next if ch[:result_data].empty?
              result = send("proc_#{method}",
                            ch[:result_data].force_encoding('UTF-8'))
              next if result.empty?
              event = LogStash::Event.new(
                method => result,
                host: @host,
                type: @type || "system-#{method}",
                metric_name: "system-#{method}",
                remote_host: ch[:result_host],
                remote_port: ch[:result_port],
                command: command,
                message: ch[:result_data],
                server_tags: ch[:result_server_tags]
              )
              decorate(event)
              queue << event
            end
            ch.on_open_failed do |c, code, desc|
              @logger.warn('CHANNEL_OPEN_FAILED',
                           host: ch[:result_host],
                           channel: c,
                           code: code,
                           description: desc)
            end
            # "on_extended_data", called when the process writes to stderr
            ch.on_extended_data do |_ch, _type, data|
              ch[:result_error] << data
            end
            ch.on_close(&:close)
          end
          chan.wait
        end
      end
    end # @ssh_sessions block
    @ssh_sessions.each(&:loop)
    Stud.stoppable_sleep(@interval) { stop? }
  end # until loop
end
stop() click to toggle source
# File lib/logstash/inputs/remote_proc.rb, line 233
def stop
  @ssh_sessions.each(&:close)
  @ssh_gateways.each(&:shutdown!) unless @ssh_gateways.empty?
end

Private Instance Methods

configure!() click to toggle source

Prepare all server configuration

# File lib/logstash/inputs/remote_proc.rb, line 258
def configure!
  @servers.each do |s|
    prepare_servers!(s)
    session_options = { properties: s }
    session_options[:port] = s['port'] if s['port']
    session_options[:password] = s['password'] if s['password']
    if s['ssh_private_key']
      session_options[:auth_methods] = ['publickey']
      session_options[:keys] = [s['ssh_private_key']]
    end
    if s['gateway_host']
      gw_opts = { port: s['gateway_port'] }
      gw_opts[:password] = s['gateway_password'] if s['gateway_password']
      if s['gateway_ssh_private_key']
        gw_opts[:auth_methods] = ['publickey']
        gw_opts[:keys] = s['gateway_ssh_private_key']
      end
      gw = Net::SSH::Gateway.new(s['gateway_host'],
                                 s['gateway_username'],
                                 gw_opts)
      @ssh_gateways << gw
      @ssh_sessions << gw.ssh(s['host'],
                              s['username'],
                              session_options)
    else
      @ssh_sessions << Net::SSH.start(s['host'],
                                      s['username'],
                                      session_options)
    end
  end
end
prepare_servers!(server) click to toggle source

Return only valide property keys.

# File lib/logstash/inputs/remote_proc.rb, line 241
def prepare_servers!(server)
  server.select! { |k| SERVER_OPTIONS.include?(k) }
  server.merge!(SERVER_OPTIONS) { |_key, old, _new| old }
  cmds = if (@proc_list - ['_all']).empty?
           COMMANDS.dup
         else
           COMMANDS.select { |k, _| @proc_list.include?(k.to_s) }
         end
  # Replace 'system_reader' and 'proc_prefix_path' for each host command
  server['_commands'] = cmds.each do |k, v|
    cmds[k] = v % { system_reader: server['system_reader'],
                    proc_prefix_path: server['proc_prefix_path'] }
  end
  server['_commands'].freeze
end
proc_cpuinfo(data) click to toggle source

Process CPUINFO data.

# File lib/logstash/inputs/remote_proc.rb, line 522
def proc_cpuinfo(data)
  return {} unless data
  cpuinfo = {} # TODO(fenicks): change to array
  num_cpu = 0
  data.each_line do |line|
    line.strip!
    next if line.empty?
    m = /([^\n\t:]+)\s*:\s+(.+)$/.match(line)
    next unless m
    # Apply filters
    value = m[2] # needed to permit assignation and computation
    num_cpu += 1 if m[1].eql?('processor')
    value = m[2].split(/\s+/) if m[1] == 'flags'
    value = m[2].to_i if ['processor',
                          'physical id',
                          'siblings',
                          'core id',
                          'cpu cores',
                          'apicid',
                          'initial apicid',
                          'cpuid level',
                          'clflush size',
                          'cache size',
                          'cache_alignment'].include?(m[1])
    value = m[2].to_f if ['bogomips',
                          'cpu MHz'].include?(m[1])
    value = m[2].to_i * 1000 if m[2] =~ /\skb$/i
    index = num_cpu - 1
    cpuinfo[index] = {} unless cpuinfo.include?(index)
    cpuinfo[index][m[1]] = value
  end
  cpuinfo
end
proc_crypto(data) click to toggle source

Process CRYPTO data

# File lib/logstash/inputs/remote_proc.rb, line 320
def proc_crypto(data)
  return {} unless data
  crypto = {}
  current_crypto = ''
  data.each_line do |line|
    line.strip!
    next if line.empty?
    t = line.split(/\s+:\s+/)
    next if t.empty? || t.length != 2
    if 'name'.eql?(t[0])
      current_crypto = t[1]
      crypto[current_crypto] = {}
      next
    end
    crypto[current_crypto][t[0]] = t[1] unless current_crypto.empty?
  end
  crypto
end
proc_diskstats(data) click to toggle source

Process DISKSTATS data. www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats www.kernel.org/doc/Documentation/iostats.txt

# File lib/logstash/inputs/remote_proc.rb, line 418
def proc_diskstats(data)
  return {} unless data
  diskstats = {}
  data.each_line do |line|
    line.strip!
    t = line.split(/\s+/)
    next if t.empty? || t.length < 14
    diskstats[t[2]] = {} # device name
    diskstats[t[2]]['major number'] = t[0].to_i
    diskstats[t[2]]['minor number'] = t[1].to_i
    diskstats[t[2]]['reads completed'] = t[3].to_i
    diskstats[t[2]]['reads merged'] = t[4].to_i
    diskstats[t[2]]['sectors read'] = t[5].to_i
    diskstats[t[2]]['time spent reading ms'] = t[6].to_i
    diskstats[t[2]]['writes completed'] = t[7].to_i
    diskstats[t[2]]['writes merged'] = t[8].to_i
    diskstats[t[2]]['sectors written'] = t[9].to_i
    diskstats[t[2]]['time spent writing ms'] = t[10].to_i
    diskstats[t[2]]['io in progress'] = t[11].to_i
    diskstats[t[2]]['io time spent ms'] = t[12].to_i
    diskstats[t[2]]['io weighted time spent ms'] = t[13].to_i
  end
  diskstats
end
proc_loadavg(data) click to toggle source

Process LOADAVG data.

# File lib/logstash/inputs/remote_proc.rb, line 455
def proc_loadavg(data)
  return {} unless data
  m = %r{([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\/([^\s]+)\s+([^\s$]+)}.match(data)
  next unless m
  loadavg = {}
  if m && m.length >= 6
    loadavg.merge!('1minute' => m[1].to_f,
                   '5minutes' => m[2].to_f,
                   '15minutes' => m[3].to_f,
                   'running_processes' => m[4].to_i,
                   'total_processes' => m[5].to_i,
                   'last_running_pid' => m[6].to_i)
  end
  loadavg
end
proc_meminfo(data) click to toggle source

Process MEMINFO data.

# File lib/logstash/inputs/remote_proc.rb, line 472
def proc_meminfo(data)
  return {} unless data
  meminfo = {}
  data.each_line do |line|
    m = /([^\n\t:]+)\s*:\s+(\d+)(\skb)?$/i.match(line)
    next unless m
    meminfo[m[1]] = m[2].to_i
    meminfo[m[1]] *= 1000 if m[3] # m[3] is not nil if `/KB/i` is found
  end
  meminfo
end
proc_mounts(data) click to toggle source

Process MOUNTS data

# File lib/logstash/inputs/remote_proc.rb, line 340
def proc_mounts(data)
  return {} unless data
  mounts = {}
  data.each_line do |line|
    line.strip!
    t = line.split(/\s+/)
    next if t.empty? || t.length < 6
    # mounted device name
    device = {}
    device['mountPoint'] = t[1]
    device['fsType'] = t[2]
    device['fsOptions'] = t[3].split(/,/)
    device['dump'] = t[4]
    device['pass'] = t[5]
    mounts[t[0]] = [] unless mounts.include?(t[0])
    mounts[t[0]] << device
  end
  mounts
end
proc_netdev(data) click to toggle source

Process NETDEV data.

# File lib/logstash/inputs/remote_proc.rb, line 386
def proc_netdev(data)
  return {} unless data
  lines = data.lines
  netdev = {}
  lines.drop(2).each do |l|
    l.strip!
    t = l.split(/[:\s]+/)
    next if t.empty? || t.length < 17
    netdev[t[0]] = {}
    netdev[t[0]]['rxbytes'] = t[1].to_i
    netdev[t[0]]['rxpackets'] = t[2].to_i
    netdev[t[0]]['rxerrs'] = t[3].to_i
    netdev[t[0]]['rxdrop'] = t[4].to_i
    netdev[t[0]]['rxfifo'] = t[5].to_i
    netdev[t[0]]['rxframe'] = t[6].to_i
    netdev[t[0]]['rxcompressed'] = t[7].to_i
    netdev[t[0]]['rxmulticast'] = t[8].to_i
    netdev[t[0]]['txbytes'] = t[9].to_i
    netdev[t[0]]['txpackets'] = t[10].to_i
    netdev[t[0]]['txerrs'] = t[11].to_i
    netdev[t[0]]['txdrop'] = t[12].to_i
    netdev[t[0]]['txfifo'] = t[13].to_i
    netdev[t[0]]['txcolls'] = t[14].to_i
    netdev[t[0]]['txcarrier'] = t[15].to_i
    netdev[t[0]]['txcompressed'] = t[16].to_i
  end
  netdev
end
proc_netwireless(data) click to toggle source

Process NETWIRELESS data.

# File lib/logstash/inputs/remote_proc.rb, line 361
def proc_netwireless(data)
  return {} unless data
  lines = data.lines
  netwireless = {}
  lines.drop(2).each do |l|
    l.strip!
    t = l.split(/[:\s]+/)
    next if t.empty? || t.length < 11 # Last column WE22 is often empty
    netwireless[t[0]] = {}
    netwireless[t[0]]['status'] = t[1].to_i
    netwireless[t[0]]['linkQuality'] = t[2].to_i
    netwireless[t[0]]['levelQuality'] = t[3].to_i
    netwireless[t[0]]['noiseQuality'] = t[4].to_i
    netwireless[t[0]]['nwidDiscarded'] = t[5].to_i
    netwireless[t[0]]['cryptDiscarded'] = t[6].to_i
    netwireless[t[0]]['fragDiscarded'] = t[7].to_i
    netwireless[t[0]]['retryDiscarded'] = t[8].to_i
    netwireless[t[0]]['miscDiscarded'] = t[9].to_i
    netwireless[t[0]]['beaconMissed'] = t[10].to_i
    netwireless[t[0]]['we22'] = t[11].to_i
  end
  netwireless
end
proc_stat(data) click to toggle source

Process STAT data. lxr.free-electrons.com/source/Documentation/filesystems/proc.txt#L1294

# File lib/logstash/inputs/remote_proc.rb, line 486
def proc_stat(data)
  return {} unless data
  stat = {}
  data.each_line do |line|
    m = /^(cpu[0-9]*|intr|ctxt|btime|processes|procs_running|procs_blocked|softirq)\s+(.*)$/i.match(line)
    next unless m
    if m[1] =~ /^cpu[0-9]*/i
      m_sub = m[2].split(/\s+/)
      if m_sub.length >= 7
        stat[m[1]] = {
          user: m_sub[0].to_i,
          nice: m_sub[1].to_i,
          system: m_sub[2].to_i,
          idle: m_sub[3].to_i,
          iowait: m_sub[4].to_i,
          irq: m_sub[5].to_i,
          softirq: m_sub[6].to_i,
          steal: m_sub[7].to_i,
          guest: m_sub[8].to_i,
          guest_nice: m_sub[9].to_i
        }
      end
    elsif m[1] =~ /^(ctxt|btime|processes|procs_running|procs_blocked)/i
      stat[m[1]] = m[2].to_i
    elsif m[1] =~ /^(intr|softirq)/i
      m_sub = m[2].split(/\s+/)
      next if m_sub.empty?
      total = m_sub.shift.to_i
      stat[m[1]] = { total: total }
      stat[m[1]][:subsequents] = m_sub.map!(&:to_i)
    end
  end
  stat
end
proc_sysvipcshm(data) click to toggle source

Process SYSVIPCSHM data

# File lib/logstash/inputs/remote_proc.rb, line 291
def proc_sysvipcshm(data)
  return {} unless data
  lines = data.lines
  sysvipcshm = {}
  lines.drop(1).each do |l|
    l.strip!
    t = l.split(/\s+/)
    next if t.empty? || t.length < 16
    sysvipcshm[t[1]] = {} # shmid
    sysvipcshm[t[1]]['key'] = t[0]
    sysvipcshm[t[1]]['perms'] = t[2]
    sysvipcshm[t[1]]['size'] = t[3]
    sysvipcshm[t[1]]['cpid'] = t[4]
    sysvipcshm[t[1]]['lpid'] = t[5]
    sysvipcshm[t[1]]['nattch'] = t[6]
    sysvipcshm[t[1]]['uid'] = t[7]
    sysvipcshm[t[1]]['gid'] = t[8]
    sysvipcshm[t[1]]['cuid'] = t[9]
    sysvipcshm[t[1]]['cgid'] = t[10]
    sysvipcshm[t[1]]['atime'] = t[11]
    sysvipcshm[t[1]]['dtime'] = t[12]
    sysvipcshm[t[1]]['ctime'] = t[13]
    sysvipcshm[t[1]]['rss'] = t[14]
    sysvipcshm[t[1]]['swap'] = t[15]
  end
  sysvipcshm
end
proc_vmstat(data) click to toggle source

Process VMSTAT data.

# File lib/logstash/inputs/remote_proc.rb, line 444
def proc_vmstat(data)
  return {} unless data
  vmstat = {}
  data.each_line do |line|
    m = /([^\s]+)\s+(\d+)/.match(line)
    vmstat[m[1]] = m[2].to_i if m && m.length >= 3
  end
  vmstat
end