class Fluent::Plugin::LightCoreFilter
Public Instance Methods
filter(tag, time, record)
click to toggle source
主处理
# File lib/fluent/plugin/filter_light_core.rb, line 70 def filter(tag, time, record) if ['app', 'service'].include? tag record = filter_app(tag, time, record) return notice('app', record) end if ['lb', 'hub'].include? tag record = filter_lb(tag, time, record) return notice('lb', record) end if ['mongo', 'secondary', 'arbiter'].include? tag record = filter_mongo(tag, time, record) return notice('mongo', record) end record end
filter_app(tag, time, record)
click to toggle source
Parse the application log
# File lib/fluent/plugin/filter_light_core.rb, line 92 def filter_app(tag, time, record) file = record['file'].split('/').last.split('_') # Parse log file name log = record['log'] # Get detailed log content # Set common items # record['environment'] = Socket.gethostname.split('-')[0] # dev | prd record['environment'] = ENV['FLUENTD_ENV'] # dev | prd record['cid'] = file[0] # container id record['cname'] = file[0].split('-')[1] # container name record['ctime'] = record['time'] # container time # Delete useless content record.delete('log') record.delete('file') record.delete('time') # Standard AP log [2020-07-06T09:21:51.121] [A] [INFO] xxxxx if /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\] \[A\]/.match(log) item = /^\[(?<time>[^\]]*)\] \[(?<component>[^\]]*)\] \[(?<levle>[^\]]*)\] (?<line>[^\ ]*) - - (?<uid>[^\ ]*) - (?<message>.*)$/.match(log) record['time'] = item[:time] record['component'] = item[:component] record['levle'] = item[:levle] record['line'] = item[:line] record['uid'] = item[:uid] record['message'] = item[:message] return record end # Standard Access log [2020-07-06T09:21:51.121] [I] [GET] xxxxx if /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\] \[I\]/.match(log) item = /^\[(?<time>[^\]]*)\] \[(?<component>[^\]]*)\] \[(?<method>[^\]]*)\] (?<url>[^\ ]*) (?<status>[^\ ]*) (?<size>[^\ ]*) (?<uid>[^\ ]*) (?<elapsed>[^\ ]*) (?<addr>[^\ ]*)$/.match(log) record['time'] = item[:time] record['component'] = item[:component] record['method'] = item[:method] record['url'] = item[:url] record['status'] = item[:status] record['size'] = format_str(item[:size]) record['uid'] = item[:uid] record['elapsed'] = format_str(item[:elapsed]) record['addr'] = item[:addr].gsub(/\n$/, '') return record end # Console log record['message'] = log.gsub(/\n$/, '') record end
filter_lb(tag, time, record)
click to toggle source
docs.nginx.com/nginx/admin-guide/monitoring/logging/
# File lib/fluent/plugin/filter_light_core.rb, line 147 def filter_lb(tag, time, record) file = record['file'].split('/').last.split('_') log = record['log'] record['environment'] = ENV['FLUENTD_ENV'] record['cid'] = file[0] record['cname'] = tag record['ctime'] = record['time'] record.delete('log') record.delete('file') record.delete('time') # access log if /^[^ ]+ [^ ]+ [^ ]+ \[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}\]/.match(log) item = /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)" "(?<elapsed>[^\"]*)" "(?<requestid>[^\"]*)" "(?<sessionid>[^\"]*)")?/.match(log) record['remote'] = item[:remote] record['host'] = item[:host] record['user'] = item[:user] record['time'] = item[:time] record['method'] = item[:method] record['path'] = item[:path] record['code'] = item[:code] record['size'] = format_str(item[:size]) record['referer'] = item[:referer] record['agent'] = item[:agent] record['elapsed'] = format_str(item[:elapsed]) record['requestid'] = item[:requestid] record['sessionid'] = item[:sessionid] return record end # error log if /^\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2} \[error\]/.match(log) item = /^(?<time>[^ ]+ [^ ]+) \[(?<level>.*)\] (?<pid>\d*)#(?<tid>[^:]*): \*(?<cid>\d*) (?<message>.*)$/.match(log) record['time'] = item[:time] record['level'] = item[:level] record['message'] = item[:message] record['process'] = item[:pid] # process id record['thread'] = item[:tid] # thread id record['counter'] = item[:cid] # counter detail = /request: "(?<method>[^ ]*) (?<path>[^"]*)"/.match(item[:message]) unless detail.nil? record['method'] = detail[:method] record['path'] = detail[:path] end detail = /referrer: "(?<referrer>[^"]*)"/.match(item[:message]) unless detail.nil? record['referrer'] = detail[:referrer] end return record end # other log record['message'] = log.gsub(/\n$/, '') record end
filter_mongo(tag, time, record)
click to toggle source
docs.mongodb.com/manual/reference/log-messages/
# File lib/fluent/plugin/filter_light_core.rb, line 213 def filter_mongo(tag, time, record) file = record['file'].split('/').last.split('_') log = record['log'] record['environment'] = ENV['FLUENTD_ENV'] record['cid'] = file[0] record['cname'] = tag record['ctime'] = record['time'] record.delete('log') record.delete('file') record.delete('time') # 旧版本日志格式解析 if /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{4} [A-Z]/.match(log) item = /^(?<time>[^ ]*) (?<severity>[A-Z])\s* (?<component>(-|([^ ]*)))\s* \[(?<context>[^\]]*)\]\s* ((?<query>.*) protocol:op_query (?<querytime>[\d\.]+(?=ms))|(?<message>.*))/.match(log) record['time'] = item[:time] record['severity'] = item[:severity] record['component'] = item[:component] record['context'] = item[:context] if item[:message] record['message'] = item[:message] return record end if item[:querytime] record['querytime'] = format_str(item[:querytime]) end if item[:query] record['command'] = item[:query] query = /^command\s* (?<collection>[^ ]*) (?<command>.*)$/.match(item[:query]) unless query.nil? record['collection'] = query[:collection] record['command'] = query[:command] end end return record else # 版本4.4开始,默认日志为json格式 item = JSON.parse(log) record['time'] = item['t']['$date'] record['severity'] = item['s'] record['component'] = item['c'] record['context'] = item['ctx'] record['identifier'] = item['id'] record['message'] = item['msg'] attributes = item['attr'] if attributes record['querytime'] = attributes['durationMillis'] record['collection'] = attributes['ns'] record['command'] = attributes['command'] record['attr'] = attributes end record['tags'] = item['tags'] record['truncated'] = item['truncated'] record['size'] = item['size'] return record end record end
format_str(str)
click to toggle source
转数字
# File lib/fluent/plugin/filter_light_core.rb, line 366 def format_str(str) return str.to_i if (str =~ /^\d+$/) return str.to_f if (str =~ /^\d+\.\d+$/) str end
notice(tag, record)
click to toggle source
确认是否发送通知
# File lib/fluent/plugin/filter_light_core.rb, line 286 def notice(tag, record) unless @sentry return record end if tag == 'app' # 未使用 if @app_stream && record['stream'] send(record) if record['stream'] == @app_stream end # 未使用 if @app_message.length > 0 && record['message'] @app_message.each do |pattern| if pattern.match(record['message']) send(record) break end end end if @app_status.length > 0 && record['status'] message = 'Status code abnormal : ' + record['url'] send(tag, message, record) if @app_status.include?(record['status']) end if @app_elapsed > 0 && record['elapsed'] message = 'Slow process : ' + record['url'] send(tag, message, record) if record['elapsed'].to_f >= @app_elapsed end return record end if tag == 'lb' if @lb_stream && record['stream'] send(tag, 'Stderror', record) if record['stream'] == @lb_stream end if @lb_code.length > 0 && record['code'] message = 'Status code abnormal : ' + record['path'] send(tag, message, record) if @lb_code.include?(record['code']) end if @lb_elapsed > 0 && record['elapsed'] message = 'Slow request : ' + record['path'] send(tag, message, record) if record['elapsed'].to_f >= @lb_elapsed end return record end if tag == 'mongo' if @mongo_severity.length > 0 && record['severity'] message = 'Severity level abnormal : ' + record['severity'] send(tag, message, record) if @mongo_severity.include?(record['severity']) end if @mongo_querytime > 0 && record['querytime'] message = 'Slow query' message = message + ' : ' + record['collection'] if record['collection'] send(tag, message, record) if record['querytime'].to_f >= @mongo_querytime end return record end record end
send(tag, message, record)
click to toggle source
发送UDP请求
# File lib/fluent/plugin/filter_light_core.rb, line 360 def send(tag, message, record) log.debug('send udp notice') Sentry.capture_message(message, :extra => record, :tags => {'log' => tag}) end
shutdown()
click to toggle source
清理
Calls superclass method
# File lib/fluent/plugin/filter_light_core.rb, line 65 def shutdown super end
start()
click to toggle source
初始化 Sentry
Calls superclass method
# File lib/fluent/plugin/filter_light_core.rb, line 45 def start super if @sentry log.info('init sentry') Sentry.init do |config| config.dsn = @sentry_dsn # To activate performance monitoring, set one of these options. # We recommend adjusting the value in production: config.traces_sample_rate = 1 # config.background_worker_threads = 2 config.transport.timeout = 10 config.transport.open_timeout = 10 end end end