module ViaqDataModelFilterSystemd

Constants

JOURNAL_FIELD_MAP_SYSTEMD_K
JOURNAL_FIELD_MAP_SYSTEMD_T

map of journal fields to viaq data model field

JOURNAL_FIELD_MAP_SYSTEMD_U
JOURNAL_TIME_FIELDS

Public Instance Methods

process_journal_fields(tag, time, record, fmtr) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model_systemd.rb, line 71
def process_journal_fields(tag, time, record, fmtr)
  systemd_t = {}
  JOURNAL_FIELD_MAP_SYSTEMD_T.each do |jkey, key|
    if record.key?(jkey)
      systemd_t[key] = record[jkey]
    end
  end
  systemd_u = {}
  JOURNAL_FIELD_MAP_SYSTEMD_U.each do |jkey, key|
    if record.key?(jkey)
      systemd_u[key] = record[jkey]
    end
  end
  systemd_k = {}
  JOURNAL_FIELD_MAP_SYSTEMD_K.each do |jkey, key|
    if record.key?(jkey)
      systemd_k[key] = record[jkey]
    end
  end
  unless systemd_t.empty?
    (record['systemd'] ||= {})['t'] = systemd_t
  end
  unless systemd_u.empty?
    (record['systemd'] ||= {})['u'] = systemd_u
  end
  unless systemd_k.empty?
    (record['systemd'] ||= {})['k'] = systemd_k
  end
  record['level'] = normalize_level(record['level'], nil, record['PRIORITY'])
  JOURNAL_TIME_FIELDS.each do |field|
    if (val = record[field])
      vali = val.to_i
      record['time'] = Time.at(vali / 1000000, vali % 1000000).utc.to_datetime.rfc3339(6)
      break
    end
  end
  case fmtr.type
  when :sys_journal
    record['message'] = record['MESSAGE']
    if record['_HOSTNAME'].eql?('localhost') && @docker_hostname
      record['hostname'] = @docker_hostname
    else
      record['hostname'] = record['_HOSTNAME']
    end
    # system, non-kubernetes containers
    docker = {}
    if record.key?('CONTAINER_ID_FULL')
      docker['container_id'] = record['CONTAINER_ID_FULL']
    end
    if record.key?('CONTAINER_ID')
      docker['container_id_short'] = record['CONTAINER_ID']
    end
    if record.key?('CONTAINER_NAME')
      docker['container_name'] = record['CONTAINER_NAME']
    end
    unless docker.empty?
      record['docker'] = record['docker'] ? record['docker'].merge(docker) : docker
    end
  when :k8s_journal
    record['message'] = record['message'] || record['MESSAGE'] || record['log']
    if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \
       (k8shost = record['kubernetes'].fetch('host', nil))
      record['hostname'] = k8shost
    elsif @docker_hostname
      record['hostname'] = @docker_hostname
    else
      record['hostname'] = record['_HOSTNAME']
    end
    transform_eventrouter(tag, record, fmtr)
  end
end