class Fluent::Plugin::MultiTypeParserFilter

Constants

FAILED_RESULT
REPLACE_CHAR

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_multi_type_parser.rb, line 39
def initialize
  super
  @parsers = []
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_multi_type_parser.rb, line 44
def configure(conf)
  super

  parsers_config = nil
  conf.elements.each do | e |
    next unless ['parsers'].include?(e.name)
    parsers_config = e.elements
  end

  unless !parsers_config.nil? && parsers_config.length > 0
    raise Fluent::ConfigError, "section <parses> is required."
  end

  parsers_config.each do | p |
    next unless ['parse'].include?(p.name)
    next unless p.has_key?('@type')

    parser = Fluent::Plugin.new_parser(p['@type'], parent: self)
    parser.configure(p)
    @parsers << parser
  end
end
filter_with_time(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_multi_type_parser.rb, line 70
def filter_with_time(tag, time, record)

  raw_value = record[@key_name]
  if raw_value.nil?
    if @emit_invalid_record_to_error
      router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
    end
    if @reserve_data
      return time, handle_parsed(tag, record, time, {})
    end
  end

  @parsers.each do | parser |
    begin
      t, r = parse_record(parser, tag, time, raw_value)

      unless t.nil? || r.nil?
        return t, r
      end
    rescue => e
      log.warn("parse failed #{e.message}") #unless @suppress_parse_error_log
    end
  end
end

Private Instance Methods

handle_parsed(tag, record, t, values) click to toggle source
# File lib/fluent/plugin/filter_multi_type_parser.rb, line 143
def handle_parsed(tag, record, t, values)
  if values && @inject_key_prefix
    values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
  end
  r = @hash_value_field ? {@hash_value_field => values} : values
  if @reserve_data
    r = r ? record.merge(r) : record
  end
  r
end
parse_record(parser, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_multi_type_parser.rb, line 97
def parse_record(parser, tag, time, record)

  begin
    parser.parse(record) do |t, values|
      if values
        t = if @reserve_time
              time
            else
              t.nil? ? time : t
            end
        r = handle_parsed(tag, record, t, values)
        return t, r
      else
        if @emit_invalid_record_to_error
          router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
        end
        if @reserve_data
          t = time
          r = handle_parsed(tag, record, time, {})
          return t, r
        else
          return FAILED_RESULT
        end
      end
    end
  rescue Fluent::Plugin::Parser::ParserError => e
    if @emit_invalid_record_to_error
      raise e
    else
      return FAILED_RESULT
    end
  rescue ArgumentError => e
    raise unless @replace_invalid_sequence
    raise unless e.message.index("invalid byte sequence in") == 0

    raw_value = raw_value.scrub(REPLACE_CHAR)
    retry
  rescue => e
    if @emit_invalid_record_to_error
      raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
    else
      return FAILED_RESULT
    end
  end
end