class Fluent::RdsErrorLogInput
Constants
- LOG_REGEXP
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_error_log.rb, line 28 def configure(conf) super require 'aws-sdk' raise Fluent::ConfigError.new("region is required") unless @region if !has_iam_role? raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil? raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil? end raise Fluent::ConfigError.new("db_instance_identifier is required") unless @db_instance_identifier raise Fluent::ConfigError.new("pos_file is required") unless @pos_file raise Fluent::ConfigError.new("refresh_interval is required") unless @refresh_interval raise Fluent::ConfigError.new("tag is required") unless @tag end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_error_log.rb, line 68 def shutdown super @watcher.terminate @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_error_log.rb, line 43 def start super # pos file touch File.open(@pos_file, File::RDWR|File::CREAT).close begin options = { :region => @region, } if @access_key_id && @secret_access_key options[:access_key_id] = @access_key_id options[:secret_access_key] = @secret_access_key end @rds = Aws::RDS::Client.new(options) rescue => e $log.warn "RDS Client error occurred: #{e.message}" end @loop = Coolio::Loop.new timer_trigger = TimerWatcher.new(@refresh_interval, true, &method(:input)) timer_trigger.attach(@loop) @thread = Thread.new(&method(:run)) end
Private Instance Methods
get_and_parse_posfile()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 96 def get_and_parse_posfile begin # get & parse pos file $log.debug "pos file get start" pos_last_written_timestamp = 0 pos_info = {} pos_log_hash = {} File.open(@pos_file, File::RDONLY) do |file| file.each_line do |line| pos_match = /^timestamp: (\d+)$/.match(line) if pos_match pos_last_written_timestamp = pos_match[1].to_i $log.debug "pos_last_written_timestamp: #{pos_last_written_timestamp}" end pos_match = /^(.+)\t(.+)\t(.+)$/.match(line) if pos_match pos_info[pos_match[1]] = pos_match[2] pos_log_hash[pos_match[1]] = pos_match[3] $log.debug "log_file: #{pos_match[1]}, marker: #{pos_match[2]}, hash: #{pos_match[3]}" end end @pos_last_written_timestamp = pos_last_written_timestamp @pos_info = pos_info @pos_log_hash = pos_log_hash end rescue => e $log.warn "pos file get and parse error occurred: #{e.message}" end end
get_logdata(logs)
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 211 def get_logdata(logs) log_file_name = logs.context.params[:log_file_name] raw_records = [] begin logs.each do |log| # save got line's marker @pos_info[log_file_name] = log.marker raw_records += log.log_file_data.split("\n") end rescue => e $log.warn e.message end return raw_records end
get_logfile(log_files)
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 181 def get_logfile(log_files) begin log_files.each do |log_file| log_file.describe_db_log_files.each do |item| # save maximum written timestamp value @pos_last_written_timestamp = item[:last_written] if @pos_last_written_timestamp < item[:last_written] # log file download log_file_name = item[:log_file_name] next if log_file_name[%r{error/mysql-error-running.log}] || log_file_name[%r{slowquery/mysql-slowquery.log\.}] || log_file_name[%r{general/mysql-general.log\.}] marker = @pos_info.has_key?(log_file_name) ? @pos_info[log_file_name] : "0" marker = "0" if rotated?(log_file_name) $log.debug "download log from rds: log_file_name=#{log_file_name}, marker=#{marker}" logs = @rds.download_db_log_file_portion( db_instance_identifier: @db_instance_identifier, log_file_name: log_file_name, marker: marker, ) raw_records = get_logdata(logs) #emit parse_and_emit(raw_records, log_file_name) unless raw_records.nil? end end rescue => e $log.warn e.message end end
get_logfile_list()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 146 def get_logfile_list begin $log.debug "get logfile-list from rds: db_instance_identifier=#{@db_instance_identifier}, pos_last_written_timestamp=#{@pos_last_written_timestamp}" log_files = @rds.describe_db_log_files( db_instance_identifier: @db_instance_identifier, file_last_written: @pos_last_written_timestamp, max_records: 10, ) @current_pos_log_hash = {} log_files.each do |log_file| log_file.describe_db_log_files.each do |item| log_file_name = item[:log_file_name] @current_pos_log_hash[log_file_name] = item.hash end end log_files rescue => e $log.warn "RDS Client describe_db_log_files error occurred: #{e.message}" end end
has_iam_role?()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 87 def has_iam_role? begin ec2 = Aws::EC2::Client.new(region: @region) !ec2.config.credentials.nil? rescue => e $log.warn "EC2 Client error occurred: #{e.message}" end end
input()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 80 def input get_and_parse_posfile log_files = get_logfile_list get_logfile(log_files) put_posfile end
parse_and_emit(raw_records, log_file_name)
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 227 def parse_and_emit(raw_records, log_file_name) begin $log.debug "raw_records.count: #{raw_records.count}" record = { "db_instance_identifier" => @db_instance_identifier, "region" => @region, "log_file_name" => log_file_name, } output_tag = @tag output_tag += ".#{log_file_name.gsub(/\/|\./, '_')}" if @output_per_file if log_file_name != "slowquery/mysql-slowquery.log" raw_records.each do |raw_record| $log.debug "raw_record=#{raw_record}" line_match = LOG_REGEXP.match(raw_record) next unless line_match record["raw"] = raw_record if @raw_output record["time"] = line_match[:time] record["message"] = line_match[:message] record["pid"] = line_match[:pid] if line_match[:pid] record["message_level"] = line_match[:message_level] if line_match[:message_level] router.emit(output_tag, Time.parse(line_match[:time] + ' +0000').to_i, record) end else myslog = MySlog.new myslog.divide(raw_records).each do |raw_record| $log.debug "raw_record=#{raw_record}" begin raw = raw_record.join("\n") if @raw_output record = record.merge(stringify_keys(myslog.parse_record(raw_record))) record["raw"] = raw if @raw_output if time = record.delete('date') time = time.to_i else time = Time.now.to_i end router.emit(output_tag, time, record) rescue => e $log.warn e.message end end end rescue => e $log.warn e.message end end
put_posfile()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 129 def put_posfile # pos file write @pos_log_hash = @current_pos_log_hash begin $log.debug "pos file write" File.open(@pos_file, File::WRONLY|File::TRUNC) do |file| file.puts "timestamp: #{@pos_last_written_timestamp.to_s}" @pos_info.each do |log_file_name, marker| file.puts "#{log_file_name}\t#{marker}\t#{@pos_log_hash[log_file_name]}" end end rescue => e $log.warn "pos file write error occurred: #{e.message}" end end
rotated?(log_file)
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 169 def rotated?(log_file) utc_hour = (Time.now - 3600).utc.hour hash_target = "" case log_file when "error/mysql-error.log" then hash_target = "error/mysql-error-running.log" else hash_target = "#{log_file}.#{utc_hour}" end return @current_pos_log_hash[hash_target] && @pos_log_hash[hash_target] != @current_pos_log_hash[hash_target] end
run()
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 76 def run @loop.run end
stringify_keys(record)
click to toggle source
# File lib/fluent/plugin/in_rds_error_log.rb, line 278 def stringify_keys(record) result = {} record.each_key do |key| result[key.to_s] = record[key] end result end