class Fluent::RdsMysqlSlowLog
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 26 def configure(conf) super configure_servers configure_timezone configure_encoding end
configure_encoding()
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 51 def configure_encoding if !@encoding && @from_encoding raise ConfigError, "'from_encoding' parameter must be specified with 'encoding' parameter." end @encoding = parse_encoding_param(@encoding) if @encoding @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding end
configure_servers()
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 34 def configure_servers @servers.map! do |s| tag = @tag_prefix ? "#{@tag_prefix}.#{s.tag}" : s.tag [tag, Server.new(s.host, s.port, s.username, s.password)] end end
configure_timezone()
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 41 def configure_timezone @database_timezone = parse_timezone_param(@database_timezone) if @database_timezone end
emit_slow_log(tag, server)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 88 def emit_slow_log(tag, server) server.connect do |client| client.query('CALL mysql.rds_rotate_slow_log') es = MultiEventStream.new client.query('SELECT * FROM slow_log_backup').each do |row| es.add(*process(row)) end router.emit_stream(tag, es) end rescue log.error $!.to_s log.error_backtrace end
encode(str, dst, src)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 136 def encode(str, dst, src) if str.nil? nil elsif src str.encode!(dst, src) else str.force_encoding(dst) end end
extract_time(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 172 def extract_time(record) time = @keep_time_key ? record['start_time'] : record.delete('start_time') (time || Engine.now).to_i end
on_timer()
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 82 def on_timer @servers.each.with_index do |(tag, server), i| emit_slow_log(tag, server) end end
parse_encoding_param(encoding)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 60 def parse_encoding_param(encoding) Encoding.find(encoding) rescue ArgumentError => e raise ConfigError, e.message end
parse_timezone_param(timezone)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 45 def parse_timezone_param(timezone) TZInfo::Timezone.get(timezone) rescue InvalidTimezoneIdentifier => e raise ConfigError, e.message end
process(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 104 def process(record) process_timestamp(record) process_string(record) process_time(record) process_integer(record) [extract_time(record), record] end
process_integer(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 158 def process_integer(record) %w[ rows_sent rows_examined last_insert_id insert_id server_id thread_id rows_affected ].each do |field| record[field] &&= record[field].to_i end end
process_string(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 122 def process_string(record) if @null_empty_string %w[user_host db sql_text].each do |field| record[field] = nil if (record[field] || '').empty? end end if @encoding encode(record['user_host'], @encoding, Encoding::UTF_8) encode(record['db'], @encoding, Encoding::UTF_8) encode(record['sql_text'], @encoding, @from_encoding) end end
process_time(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 146 def process_time(record) %w[query_time lock_time].each do |field| record[field] &&= time_to_microseconds(record[field]) end end
process_timestamp(record)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 112 def process_timestamp(record) record['start_time'] &&= timestamp_to_time(record['start_time']) end
run()
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 75 def run @loop.run rescue log.error $!.to_s log.error_backtrace end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 177 def shutdown super @loop.watchers.each(&:detach) @loop.stop @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 66 def start super @loop = Coolio::Loop.new @timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end
time_to_microseconds(time)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 152 def time_to_microseconds(time) hour, min, sec, usec = time.scanf('%4d:%2u:%2u.%6u') sign = hour < 0 ? -1 : 1 (hour.abs * 3_600_000_000 + min * 60_000_000 + sec * 1_000_000 + usec.to_i) * sign end
timestamp_to_time(timestamp)
click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 116 def timestamp_to_time(timestamp) year, month, day, hour, min, sec, usec = timestamp.scanf('%4u-%2u-%2u %2u:%2u:%2u.%6u') t = Time.utc(year, month, day, hour, min, sec, usec) @database_timezone ? @database_timezone.local_to_utc(t) : t end