class Fluent::AuroraSlowqueryLog
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 28 def initialize super @current_slowlog = nil @previous_slowlog = nil end
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 37 def configure(conf) super end
create_rds_client()
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 70 def create_rds_client @rds_client = if @aws_access_key_id && @aws_secret_access_key Aws::RDS::Client.new( region: @region, access_key_id: @aws_access_key_id, secret_access_key: @aws_secret_access_key ) else # Use IAM Profile Aws::RDS::Client.new(region: @region) end end
emit_slowlogs(records)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 217 def emit_slowlogs(records) es = MultiEventStream.new records.each do |record| es.add(record[:date], record) end unless es.empty? begin router.emit_stream(@tag, es) rescue end end end
exclude_useless_sql(parsed_log_data)
click to toggle source
Exclude useless SQL queries
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 196 def exclude_useless_sql(parsed_log_data) responses = [] parsed_log_data.each do |record| if m = record[:sql].match(/^\/rdsdbbin\/oscar\/bin\/mysqld/) next elsif m = record[:sql].match(/^(.+); \/rdsdbbin\/oscar\/bin\/mysqld,.+ Argument/) responses << record responses.last[:sql] = m[1] elsif m = record[:sql].match(/^use .+; SET timestamp=\d+; (.+)/) responses << record responses.last[:sql] = m[1] elsif m = record[:sql].match(/^SET timestamp=\d+; (.+)/) responses << record responses.last[:sql] = m[1] else responses << record end end responses end
fetch_and_emit_log(log_file_name,marker,save)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 101 def fetch_and_emit_log(log_file_name,marker,save) if save if marker fetched_file = @rds_client.download_db_log_file_portion( db_instance_identifier: @db_instance_identifier, log_file_name: log_file_name, marker: marker) save_state(fetched_file) records = parse_fetched_file(fetched_file) emit_slowlogs(records) else fetched_file = @rds_client.download_db_log_file_portion( db_instance_identifier: @db_instance_identifier, log_file_name: log_file_name) save_state(fetched_file) records = parse_fetched_file(fetched_file) emit_slowlogs(records) end else # Executed when log rotation occurs without pending data fetched_file = @rds_client.download_db_log_file_portion( db_instance_identifier: @db_instance_identifier, log_file_name: log_file_name, marker: marker) system("rm #{@aurora_state_file}") records = parse_fetched_file(fetched_file) emit_slowlogs(records) end end
fetch_aurora_slow_log()
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 82 def fetch_aurora_slow_log create_rds_client fetch_recent_slowlogs if File.exist?(@aurora_state_file) state = load_state if slowlog_rotated?(state) if state["additional_data_pending"] fetch_and_emit_log(@previous_slowlog, state["marker"],true) else fetch_and_emit_log(@previous_slowlog, state["marker"],false) end else fetch_and_emit_log(@current_slowlog, state["marker"],true) end else fetch_and_emit_log(@current_slowlog, false, true) end end
fetch_recent_slowlogs()
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 158 def fetch_recent_slowlogs log_files = [] unix_time_2hours_ago = (2.hours.ago.to_f * 1000).floor @rds_client.describe_db_log_files(db_instance_identifier: @db_instance_identifier, filename_contains: @filename_contains, file_last_written: unix_time_2hours_ago).each do |page| page.describe_db_log_files.each do |f| log_files << f end end sorted_slow_log_files = log_files.sort_by do |f| f.last_written end if sorted_slow_log_files.length >= 2 current,previous = sorted_slow_log_files[-1], sorted_slow_log_files[-2] @current_slowlog = current.log_file_name @previous_slowlog = previous.log_file_name elsif sorted_slow_log_files.length >= 1 current = sorted_slow_log_files[-1] @current_slowlog = current.log_file_name else raise "There is no slowlog. Please set log_output=FILE" end end
load_state()
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 140 def load_state begin YAML.load_file(@aurora_state_file) rescue SystemCallError => e puts %Q(class=[#{e.class}] message=[#{e.message}]) rescue IOError => e puts %Q(class=[#{e.class}] message=[#{e.message}]) end end
parse_fetched_file(fetched_file)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 130 def parse_fetched_file(fetched_file) parsed_log_data= parse_query(fetched_file) exclude_useless_sql(parsed_log_data) end
parse_query(fetched_file)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 135 def parse_query(fetched_file) myslog = MySlog.new myslog.parse(fetched_file.log_file_data) end
save_state(fetched_file)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 184 def save_state(fetched_file) open(@aurora_state_file, 'w') do |f| state_keys={} state_keys["marker"] = fetched_file.marker state_keys["additional_data_pending"] = fetched_file.additional_data_pending state_keys["current_slowlog"] = @current_slowlog state_keys["previous_slowlog"] = @previous_slowlog YAML.dump(state_keys,f) end end
shutdown()
click to toggle source
This method is called when shutting down.
Calls superclass method
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 51 def shutdown super @stop_flag = true $log.debug 'Waiting for thread to finish' @thread.join end
slowlog_rotated?(state)
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 150 def slowlog_rotated?(state) if @previous_slowlog == state["previous_slowlog"] false else true end end
start()
click to toggle source
This method is called when starting. Open sockets or files and create a thread here.
Calls superclass method
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 43 def start super @stop_flag = false $log.debug 'Start aurora log collection thread' @thread = Thread.new(&method(:thread_main)) end
thread_main()
click to toggle source
# File lib/fluent/plugin/in_aurora_slowquerylog.rb, line 58 def thread_main until @stop_flag begin fetch_aurora_slow_log rescue => e log.error 'unexpected error', :error => e.message, :error_class => e.class log.error_backtrace e.backtrace end sleep @log_fetch_interval end end