class Fluent::ViaqDataModelFilter

Constants

DOT_REPLACE_CHAR_UNUSED
NORMAL_LEVELS

github.com/ViaQ/elasticsearch-templates/blob/master/namespaces/default.yml#L63

NUM_FIELDS_UNLIMITED
PRIORITY_LEVELS

numeric levels for the PRIORITY field

Public Instance Methods

add_elasticsearch_index_name_field(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 389
def add_elasticsearch_index_name_field(tag, time, record)
  found = false
  @elasticsearch_index_names.each do |ein|
    if ein.matcher.match(tag)
      found = true
      return unless ein.enabled
      if ein.name_type == :operations_full || ein.name_type == :project_full || ein.name_type == :audit_full
        field_name = @elasticsearch_index_name_field
        need_time = true
      else
        field_name = @elasticsearch_index_prefix_field
        need_time = false
      end

      case ein.name_type
      when :audit_full, :audit_prefix
        prefix = ".audit"
      when :operations_full, :operations_prefix
        prefix = ".operations"
      when :project_full, :project_prefix
        name, uuid = nil
        unless record['kubernetes'].nil?
          k8s = record['kubernetes']
          name = k8s['namespace_name']
          uuid = k8s['namespace_id']
          if name.nil?
            log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_name field: #{record}")
          end
          if uuid.nil?
            log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_id field: #{record}")
          end
        else
          log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes field: #{record}")
        end
        if name.nil? || uuid.nil?
          name = @orphaned_namespace_name
        end
        prefix = name == @orphaned_namespace_name ? @orphaned_namespace_name : "project.#{name}.#{uuid}"
      end

      if ENV['CDM_DEBUG']
        unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
          log.error("prefix #{prefix} need_time #{need_time} time #{record[@dest_time_name]}")
        end
      end

      if need_time
        ts = DateTime.parse(record[@dest_time_name])
        record[field_name] = prefix + "." + ts.strftime("%Y.%m.%d")
      else
        record[field_name] = prefix
      end
      if ENV['CDM_DEBUG']
        unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
          log.error("record[#{field_name}] = #{record[field_name]}")
        end
      end

      break
    end
  end
  unless found
    if ENV['CDM_DEBUG']
      unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
        log.error("no match for tag #{tag}")
      end
    end
  end
end
add_pipeline_metadata(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 374
def add_pipeline_metadata (tag, time, record)
  record['pipeline_metadata'] = {} unless record.key?('pipeline_metadata')
  pipeline_type = @pipeline_type.to_s
  # this will catch the case where pipeline_type doesn't exist, or is not a Hash
  record['pipeline_metadata'][pipeline_type] = {} unless record['pipeline_metadata'][pipeline_type].respond_to?(:fetch)
  record['pipeline_metadata'][pipeline_type]['ipaddr4'] = @ipaddr4
  if @ipaddr6
    record['pipeline_metadata'][pipeline_type]['ipaddr6'] = @ipaddr6
  end
  record['pipeline_metadata'][pipeline_type]['inputname'] = 'fluent-plugin-systemd'
  record['pipeline_metadata'][pipeline_type]['name'] = 'fluentd'
  record['pipeline_metadata'][pipeline_type]['received_at'] = Time.now.utc.to_datetime.rfc3339(6)
  record['pipeline_metadata'][pipeline_type]['version'] = @pipeline_version
end
check_for_match_and_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 350
def check_for_match_and_format(tag, time, record)
  return unless @formatters
  return if @formatter_cache_nomatch[tag]
  fmtr = @formatter_cache[tag]
  unless fmtr
    idx = @formatters.index{|fmtr| fmtr.matcher.match(tag)}
    if idx && (fmtr = @formatters[idx]).enabled
      @formatter_cache[tag] = fmtr
    else
      @formatter_cache_nomatch[tag] = true
      return
    end
  end
  fmtr.fmtr_func.call(tag, time, record, fmtr)

  if record[@dest_time_name].nil? && record['time'].nil?
    record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
  end

  if fmtr.fmtr_remove_keys
    fmtr.fmtr_remove_keys.each{|k| record.delete(k)}
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 164
def configure(conf)
  super
  @keep_fields = {}
  @default_keep_fields.each{|xx| @keep_fields[xx] = true}
  @extra_keep_fields.each{|xx| @keep_fields[xx] = true}
  @keep_empty_fields_hash = {}
  @keep_empty_fields.each do |xx|
    @keep_empty_fields_hash[xx] = true
    @keep_fields[xx] = true
  end
  if @use_undefined && @keep_fields.key?(@undefined_name)
    raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields"
  end
  if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name)
    raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields"
  end
  @undefined_dot_replace_char = nil if @undefined_dot_replace_char == DOT_REPLACE_CHAR_UNUSED
  if @formatters
    @formatters.each do |fmtr|
      matcher = ViaqMatchClass.new(fmtr.tag, nil)
      fmtr.instance_eval{ @params[:matcher] = matcher }
      if fmtr.remove_keys
        fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') }
      else
        fmtr.instance_eval{ @params[:fmtr_remove_keys] = nil }
      end
      case fmtr.type
      when :sys_journal, :k8s_journal
        fmtr_func = method(:process_journal_fields)
      when :sys_var_log
        fmtr_func = method(:process_sys_var_log_fields)
      when :k8s_json_file
        fmtr_func = method(:process_k8s_json_file_fields)
      end
      fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func }
      proc_k8s_ev = fmtr.process_kubernetes_events.nil? ? @process_kubernetes_events : fmtr.process_kubernetes_events
      fmtr.instance_eval{ @params[:process_kubernetes_events] = proc_k8s_ev }
    end
    @formatter_cache = {}
    @formatter_cache_nomatch = {}
  end
  begin
    @docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
  rescue
    @docker_hostname = ENV['NODE_NAME'] || nil
  end
  @ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
  @ipaddr6 = nil

  if ENV['IPADDR6'] && ENV['IPADDR6'].length > 0
    @ipaddr6 = ENV['IPADDR6']
  end
  
  @pipeline_version = (ENV['FLUENTD_VERSION'] || 'unknown fluentd version') + ' ' + (ENV['DATA_VERSION'] || 'unknown data version')
  # create the elasticsearch index name tag matchers
  unless @elasticsearch_index_names.empty?
    @elasticsearch_index_names.each do |ein|
      matcher = ViaqMatchClass.new(ein.tag, nil)
      ein.instance_eval{ @params[:matcher] = matcher }
    end
  end
end
delempty(thing) click to toggle source

recursively delete empty fields and empty lists/hashes from thing

# File lib/fluent/plugin/filter_viaq_data_model.rb, line 242
def delempty(thing)
  if thing.respond_to?(:delete_if)
    if thing.kind_of? Hash
      thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)}
    else # assume single element iterable
      thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)}
    end
  end
  thing
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 509
def filter(tag, time, record)
  if ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("input #{time} #{tag} #{record}")
    end
  end

  check_for_match_and_format(tag, time, record)
  add_pipeline_metadata(tag, time, record)
  handle_undefined_fields(tag, time, record)
  # remove the field from record if it is not in the list of fields to keep and
  # it is empty
  record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))}
  # probably shouldn't remove everything . . .
  log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty?
  # rename the time field
  if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name)
    val = record.delete(@src_time_name)
    unless @rename_time_if_missing && record.key?(@dest_time_name)
      record[@dest_time_name] = val
    end
  end

  if !@elasticsearch_index_names.empty?
    add_elasticsearch_index_name_field(tag, time, record)
  elsif ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("not adding elasticsearch index name or prefix")
    end
  end
  if ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("output #{time} #{tag} #{record}")
    end
  end
  record
end
handle_undefined_fields(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 475
def handle_undefined_fields(tag, time, record)
  if @undefined_to_string || @use_undefined || @undefined_dot_replace_char || (@undefined_max_num_fields > NUM_FIELDS_UNLIMITED)
    # undefined contains all of the fields not in keep_fields
    undefined_keys = record.keys - @keep_fields.keys
    return if undefined_keys.empty?
    if @undefined_max_num_fields > NUM_FIELDS_UNLIMITED && undefined_keys.length > @undefined_max_num_fields
      undefined = {}
      undefined_keys.each{|k|undefined[k] = record.delete(k)}
      record[@undefined_name] = JSON.dump(undefined)
    else
      if @use_undefined
        record[@undefined_name] = {}
        modify_hsh = record[@undefined_name]
      else
        modify_hsh = record
      end
      undefined_keys.each do |k|
        origk = k
        if @use_undefined
          modify_hsh[k] = record.delete(k)
        end
        if @undefined_dot_replace_char && k.index('.')
          newk = k.gsub('.', @undefined_dot_replace_char)
          modify_hsh[newk] = modify_hsh.delete(k)
          k = newk
        end
        if @undefined_to_string && !modify_hsh[k].is_a?(String)
          modify_hsh[k] = JSON.dump(modify_hsh[k])
        end
      end
    end
  end
end
isempty(thing) click to toggle source

if thing doesn't respond to empty? then assume it isn't empty e.g. 0.respond_to?(:empty?) == false - the FixNum 0 is not empty

# File lib/fluent/plugin/filter_viaq_data_model.rb, line 237
def isempty(thing)
  thing.respond_to?(:empty?) && thing.empty?
end
normalize_level(level, newlevel, priority=nil) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 293
def normalize_level(level, newlevel, priority=nil)
  # if the record already has a level field, and it looks like one of our well
  # known values, convert it to the canonical normalized form - otherwise,
  # preserve the value in string format
  retlevel = nil
  if !level.nil?
    unless (retlevel = NORMAL_LEVELS[level]) ||
           (level.respond_to?(:downcase) && (retlevel = NORMAL_LEVELS[level.downcase]))
      retlevel = level.to_s # don't know what it is - just convert to string
    end
  elsif !priority.nil?
    retlevel = PRIORITY_LEVELS[priority]
  else
    retlevel = NORMAL_LEVELS[newlevel]
  end
  retlevel || 'unknown'
end
process_k8s_json_file_fields(tag, time, record, fmtr = nil) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 328
def process_k8s_json_file_fields(tag, time, record, fmtr = nil)
  record['message'] = record['message'] || record['log']
  record['level'] = normalize_level(record['level'], nil)
  if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \
     (k8shost = record['kubernetes'].fetch('host', nil))
    record['hostname'] = k8shost
  elsif @docker_hostname
    record['hostname'] = @docker_hostname
  end
  if record[@dest_time_name].nil? # e.g. already has @timestamp
    unless record['time'].nil?
      # convert from string - parses a wide variety of formats
      rectime = Time.parse(record['time'])
    else
      # convert from time_t
      rectime = Time.at(time)
    end
    record['time'] = rectime.utc.to_datetime.rfc3339(6)
  end
  transform_eventrouter(tag, record, fmtr)
end
process_sys_var_log_fields(tag, time, record, fmtr = nil) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 311
def process_sys_var_log_fields(tag, time, record, fmtr = nil)
  record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}}
  if record[@dest_time_name].nil? # e.g. already has @timestamp
    # handle the case where the time reported in /var/log/messages is for a previous year
    timeobj = Time.at(time)
    if timeobj > Time.now
      timeobj = Time.new((timeobj.year - 1), timeobj.month, timeobj.day, timeobj.hour, timeobj.min, timeobj.sec, timeobj.utc_offset)
    end
    record['time'] = timeobj.utc.to_datetime.rfc3339(6)
  end
  if record['host'].eql?('localhost') && @docker_hostname
    record['hostname'] = @docker_hostname
  else
    record['hostname'] = record['host']
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 231
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 227
def start
  super
end
transform_eventrouter(tag, record, fmtr) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 459
def transform_eventrouter(tag, record, fmtr)
  return if fmtr.nil? || !fmtr.process_kubernetes_events
  if record.key?("event") && record["event"].respond_to?(:key?)
    if record.key?("verb")
      record["event"]["verb"] = record.delete("verb")
    end
    record["kubernetes"] = {} unless record.key?("kubernetes")
    record["kubernetes"]["event"] = record.delete("event")
    if record["kubernetes"]["event"].key?('message')
      ((record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] ||= {})['original_raw_message'] = record['message']
    end
    record['message'] = record["kubernetes"]["event"].delete("message")
    record['time'] = record["kubernetes"]["event"]["metadata"].delete("creationTimestamp") 
  end
end