class Fluent::Plugin::LightCoreFilter

Public Instance Methods

filter(tag, time, record) click to toggle source

主处理

# File lib/fluent/plugin/filter_light_core.rb, line 70
def filter(tag, time, record)

  if ['app', 'service'].include? tag
    record = filter_app(tag, time, record)
    return notice('app', record)
  end

  if ['lb', 'hub'].include? tag
    record = filter_lb(tag, time, record)
    return notice('lb', record)
  end

  if ['mongo', 'secondary', 'arbiter'].include? tag
    record = filter_mongo(tag, time, record)
    return notice('mongo', record)
  end

  record

end
filter_app(tag, time, record) click to toggle source

Parse the application log

# File lib/fluent/plugin/filter_light_core.rb, line 92
def filter_app(tag, time, record)
  file = record['file'].split('/').last.split('_')           # Parse log file name
  log = record['log']                                        # Get detailed log content

  # Set common items
  # record['environment'] = Socket.gethostname.split('-')[0] # dev | prd
  record['environment'] = ENV['FLUENTD_ENV']                 # dev | prd
  record['cid'] = file[0]                                    # container id
  record['cname'] = file[0].split('-')[1]                    # container name
  record['ctime'] = record['time']                           # container time

  # Delete useless content
  record.delete('log')
  record.delete('file')
  record.delete('time')
  
  # Standard AP log [2020-07-06T09:21:51.121] [A] [INFO] xxxxx
  if /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\] \[A\]/.match(log)

    item = /^\[(?<time>[^\]]*)\] \[(?<component>[^\]]*)\] \[(?<levle>[^\]]*)\] (?<line>[^\ ]*) - - (?<uid>[^\ ]*) - (?<message>.*)$/.match(log)

    record['time'] = item[:time]
    record['component'] = item[:component]
    record['levle'] = item[:levle]
    record['line'] = item[:line]
    record['uid'] = item[:uid]
    record['message'] = item[:message]

    return record
  end

  # Standard Access log [2020-07-06T09:21:51.121] [I] [GET] xxxxx
  if /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\] \[I\]/.match(log)

    item = /^\[(?<time>[^\]]*)\] \[(?<component>[^\]]*)\] \[(?<method>[^\]]*)\] (?<url>[^\ ]*) (?<status>[^\ ]*) (?<size>[^\ ]*) (?<uid>[^\ ]*) (?<elapsed>[^\ ]*) (?<addr>[^\ ]*)$/.match(log)

    record['time'] = item[:time]
    record['component'] = item[:component]
    record['method'] = item[:method]
    record['url'] = item[:url]
    record['status'] = item[:status]
    record['size'] = format_str(item[:size])
    record['uid'] = item[:uid]
    record['elapsed'] = format_str(item[:elapsed])
    record['addr'] = item[:addr].gsub(/\n$/, '')

    return record
  end

  # Console log
  record['message'] = log.gsub(/\n$/, '')
  record
end
filter_lb(tag, time, record) click to toggle source

docs.nginx.com/nginx/admin-guide/monitoring/logging/

# File lib/fluent/plugin/filter_light_core.rb, line 147
def filter_lb(tag, time, record)

  file = record['file'].split('/').last.split('_')
  log = record['log']

  record['environment'] = ENV['FLUENTD_ENV']
  record['cid'] = file[0]
  record['cname'] = tag
  record['ctime'] = record['time']

  record.delete('log')
  record.delete('file')
  record.delete('time')
  
  # access log
  if /^[^ ]+ [^ ]+ [^ ]+ \[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}\]/.match(log)
    item = /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)" "(?<elapsed>[^\"]*)" "(?<requestid>[^\"]*)" "(?<sessionid>[^\"]*)")?/.match(log)

    record['remote'] = item[:remote]
    record['host'] = item[:host]
    record['user'] = item[:user]
    record['time'] = item[:time]
    record['method'] = item[:method]
    record['path'] = item[:path]
    record['code'] = item[:code]
    record['size'] = format_str(item[:size])
    record['referer'] = item[:referer]
    record['agent'] = item[:agent]
    record['elapsed'] = format_str(item[:elapsed])
    record['requestid'] = item[:requestid]
    record['sessionid'] = item[:sessionid]

    return record
  end

  # error log
  if /^\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2} \[error\]/.match(log)
    item = /^(?<time>[^ ]+ [^ ]+) \[(?<level>.*)\] (?<pid>\d*)#(?<tid>[^:]*): \*(?<cid>\d*) (?<message>.*)$/.match(log)

    record['time'] = item[:time]
    record['level'] = item[:level]
    record['message'] = item[:message]
    record['process'] = item[:pid]     # process id
    record['thread'] = item[:tid]      # thread id
    record['counter'] = item[:cid]     # counter

    detail = /request: "(?<method>[^ ]*) (?<path>[^"]*)"/.match(item[:message])
    unless detail.nil?
      record['method'] = detail[:method]
      record['path'] = detail[:path]
    end

    detail = /referrer: "(?<referrer>[^"]*)"/.match(item[:message])
    unless detail.nil?
      record['referrer'] = detail[:referrer]
    end

    return record
  end

  # other log
  record['message'] = log.gsub(/\n$/, '')
  record
end
filter_mongo(tag, time, record) click to toggle source

docs.mongodb.com/manual/reference/log-messages/

# File lib/fluent/plugin/filter_light_core.rb, line 213
def filter_mongo(tag, time, record)

  file = record['file'].split('/').last.split('_')
  log = record['log']

  record['environment'] = ENV['FLUENTD_ENV']
  record['cid'] = file[0]
  record['cname'] = tag
  record['ctime'] = record['time']

  record.delete('log')
  record.delete('file')
  record.delete('time')

  # 旧版本日志格式解析
  if /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{4} [A-Z]/.match(log)

    item = /^(?<time>[^ ]*) (?<severity>[A-Z])\s* (?<component>(-|([^ ]*)))\s* \[(?<context>[^\]]*)\]\s* ((?<query>.*) protocol:op_query (?<querytime>[\d\.]+(?=ms))|(?<message>.*))/.match(log)

    record['time'] = item[:time]
    record['severity'] = item[:severity]
    record['component'] = item[:component]
    record['context'] = item[:context]

    if item[:message]
      record['message'] = item[:message]
      return record
    end

    if item[:querytime]
      record['querytime'] = format_str(item[:querytime])
    end

    if item[:query]
      record['command'] = item[:query]
      query = /^command\s* (?<collection>[^ ]*) (?<command>.*)$/.match(item[:query])
      unless query.nil?
        record['collection'] = query[:collection]
        record['command'] = query[:command]
      end
    end
    
    return record
  else

    # 版本4.4开始,默认日志为json格式
    item = JSON.parse(log)
    record['time'] = item['t']['$date']
    record['severity'] = item['s']
    record['component'] = item['c']
    record['context'] = item['ctx']
    record['identifier'] = item['id']
    record['message'] = item['msg']

    attributes = item['attr']
    if attributes
      record['querytime'] = attributes['durationMillis']
      record['collection'] = attributes['ns']
      record['command'] = attributes['command']
      record['attr'] = attributes
    end

    record['tags'] = item['tags']
    record['truncated'] = item['truncated']
    record['size'] = item['size']
  
    return record
  end

  record
end
format_str(str) click to toggle source

转数字

# File lib/fluent/plugin/filter_light_core.rb, line 366
def format_str(str)
  return str.to_i if (str =~ /^\d+$/)
  return str.to_f if (str =~ /^\d+\.\d+$/)
  str
end
notice(tag, record) click to toggle source

确认是否发送通知

# File lib/fluent/plugin/filter_light_core.rb, line 286
def notice(tag, record)

  unless @sentry
    return record
  end

  if tag == 'app'

    # 未使用
    if @app_stream && record['stream']
      send(record) if record['stream'] == @app_stream
    end

    # 未使用
    if @app_message.length > 0 && record['message']
      @app_message.each do |pattern|
        if pattern.match(record['message'])
          send(record)
          break
        end
      end
    end

    if @app_status.length > 0 && record['status']
      message = 'Status code abnormal : ' + record['url']
      send(tag, message, record) if @app_status.include?(record['status'])
    end

    if @app_elapsed > 0 && record['elapsed']
      message = 'Slow process : ' + record['url']
      send(tag, message, record) if record['elapsed'].to_f >= @app_elapsed
    end

    return record
  end

  if tag == 'lb'
    if @lb_stream && record['stream']
      send(tag, 'Stderror', record) if record['stream'] == @lb_stream
    end

    if @lb_code.length > 0 && record['code']
      message = 'Status code abnormal : ' + record['path']
      send(tag, message, record) if @lb_code.include?(record['code'])
    end

    if @lb_elapsed > 0 && record['elapsed']
      message = 'Slow request : ' + record['path']
      send(tag, message, record) if record['elapsed'].to_f >= @lb_elapsed
    end
    
    return record
  end

  if tag == 'mongo'
    if @mongo_severity.length > 0 && record['severity']
      message = 'Severity level abnormal : ' + record['severity']
      send(tag, message, record) if @mongo_severity.include?(record['severity'])
    end

    if @mongo_querytime > 0 && record['querytime']
      message = 'Slow query'
      message = message + ' : ' + record['collection'] if record['collection']
      send(tag, message, record) if record['querytime'].to_f >= @mongo_querytime
    end

    return record
  end

  record
  
end
send(tag, message, record) click to toggle source

发送UDP请求

# File lib/fluent/plugin/filter_light_core.rb, line 360
def send(tag, message, record)
  log.debug('send udp notice')
  Sentry.capture_message(message, :extra => record, :tags => {'log' => tag})
end
shutdown() click to toggle source

清理

Calls superclass method
# File lib/fluent/plugin/filter_light_core.rb, line 65
def shutdown
  super
end
start() click to toggle source

初始化 Sentry

Calls superclass method
# File lib/fluent/plugin/filter_light_core.rb, line 45
def start
  super

  if @sentry
    log.info('init sentry')
    Sentry.init do |config|
      config.dsn = @sentry_dsn
    
      # To activate performance monitoring, set one of these options.
      # We recommend adjusting the value in production:
      config.traces_sample_rate = 1

      # config.background_worker_threads = 2
      config.transport.timeout = 10
      config.transport.open_timeout = 10
    end
  end
end