class Fluent::Plugin::RdsPgsqlLogInput
Constants
- LOG_REGEXP
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 20 def configure(conf) super raise Fluent::ConfigError.new("region is required") unless @region if !has_iam_role? raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil? raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil? end raise Fluent::ConfigError.new("db_instance_identifier is required") unless @db_instance_identifier raise Fluent::ConfigError.new("pos_file is required") unless @pos_file raise Fluent::ConfigError.new("refresh_interval is required") unless @refresh_interval raise Fluent::ConfigError.new("tag is required") unless @tag begin options = { :region => @region, } if @access_key_id && @secret_access_key options[:access_key_id] = @access_key_id options[:secret_access_key] = @secret_access_key end @rds = Aws::RDS::Client.new(options) rescue => e log.warn "RDS Client error occurred: #{e.message}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 56 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 47 def start super # pos file touch File.open(@pos_file, File::RDWR|File::CREAT).close timer_execute(:poll_logs, @refresh_interval, repeat: true, &method(:input)) end
Private Instance Methods
event_time_of_row(record)
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 181 def event_time_of_row(record) time = Time.parse(record["time"]) return Fluent::EventTime.from_time(time) end
get_and_parse_posfile()
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 78 def get_and_parse_posfile begin # get & parse pos file log.debug "pos file get start" pos_last_written_timestamp = 0 pos_info = {} File.open(@pos_file, File::RDONLY) do |file| file.each_line do |line| pos_match = /^(\d+)$/.match(line) if pos_match pos_last_written_timestamp = pos_match[1].to_i log.debug "pos_last_written_timestamp: #{pos_last_written_timestamp}" end pos_match = /^(.+)\t(.+)$/.match(line) if pos_match pos_info[pos_match[1]] = pos_match[2] log.debug "log_file: #{pos_match[1]}, marker: #{pos_match[2]}" end end @pos_last_written_timestamp = pos_last_written_timestamp @pos_info = pos_info end rescue => e log.warn "pos file get and parse error occurred: #{e.message}" end end
get_logdata(logs)
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 165 def get_logdata(logs) log_file_name = logs.context.params[:log_file_name] raw_records = [] begin logs.each do |log| # save got line's marker @pos_info[log_file_name] = log.marker raw_records += log.log_file_data.split("\n") end rescue => e log.warn e.message end return raw_records end
get_logfile(log_files)
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 137 def get_logfile(log_files) begin log_files.each do |log_file| log_file.describe_db_log_files.each do |item| # save maximum written timestamp value @pos_last_written_timestamp = item[:last_written] if @pos_last_written_timestamp < item[:last_written] # log file download log_file_name = item[:log_file_name] marker = @pos_info.has_key?(log_file_name) ? @pos_info[log_file_name] : "0" log.debug "download log from rds: log_file_name=#{log_file_name}, marker=#{marker}" logs = @rds.download_db_log_file_portion( db_instance_identifier: @db_instance_identifier, log_file_name: log_file_name, marker: marker, ) raw_records = get_logdata(logs) #emit parse_and_emit(raw_records, log_file_name) unless raw_records.nil? end end rescue => e log.warn e.message end end
get_logfile_list()
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 124 def get_logfile_list begin log.debug "get logfile-list from rds: db_instance_identifier=#{@db_instance_identifier}, pos_last_written_timestamp=#{@pos_last_written_timestamp}" @rds.describe_db_log_files( db_instance_identifier: @db_instance_identifier, file_last_written: @pos_last_written_timestamp, max_records: 10, ) rescue => e log.warn "RDS Client describe_db_log_files error occurred: #{e.message}" end end
has_iam_role?()
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 69 def has_iam_role? begin ec2 = Aws::EC2::Client.new(region: @region) !ec2.config.credentials.nil? rescue => e log.warn "EC2 Client error occurred: #{e.message}" end end
input()
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 62 def input get_and_parse_posfile log_files = get_logfile_list get_logfile(log_files) put_posfile end
parse_and_emit(raw_records, log_file_name)
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 186 def parse_and_emit(raw_records, log_file_name) begin log.debug "raw_records.count: #{raw_records.count}" record = nil raw_records.each do |raw_record| log.debug "raw_record=#{raw_record}" line_match = LOG_REGEXP.match(raw_record) unless line_match # combine chain of log record["message"] << "\n" + raw_record unless record.nil? else # emit before record router.emit(@tag, event_time_of_row(record), record) unless record.nil? # set a record record = { "time" => line_match[:time], "host" => line_match[:host], "user" => line_match[:user], "database" => line_match[:database], "pid" => line_match[:pid], "message_level" => line_match[:message_level], "message" => line_match[:message], "log_file_name" => log_file_name, } end end # emit last record router.emit(@tag, event_time_of_row(record), record) unless record.nil? rescue => e log.warn e.message end end
put_posfile()
click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 108 def put_posfile # pos file write begin log.debug "pos file write" File.open(@pos_file, File::WRONLY|File::TRUNC) do |file| file.puts @pos_last_written_timestamp.to_s @pos_info.each do |log_file_name, marker| file.puts "#{log_file_name}\t#{marker}" end end rescue => e log.warn "pos file write error occurred: #{e.message}" end end