class Fluent::RdsMysqlSlowLog

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 26
def configure(conf)
  super

  configure_servers
  configure_timezone
  configure_encoding
end
configure_encoding() click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 51
def configure_encoding
  if !@encoding && @from_encoding
    raise ConfigError, "'from_encoding' parameter must be specified with 'encoding' parameter."
  end

  @encoding = parse_encoding_param(@encoding) if @encoding
  @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
end
configure_servers() click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 34
def configure_servers
  @servers.map! do |s|
    tag = @tag_prefix ? "#{@tag_prefix}.#{s.tag}" : s.tag
    [tag, Server.new(s.host, s.port, s.username, s.password)]
  end
end
configure_timezone() click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 41
def configure_timezone
  @database_timezone = parse_timezone_param(@database_timezone) if @database_timezone
end
emit_slow_log(tag, server) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 88
def emit_slow_log(tag, server)
  server.connect do |client|
    client.query('CALL mysql.rds_rotate_slow_log')

    es = MultiEventStream.new
    client.query('SELECT * FROM slow_log_backup').each do |row|
      es.add(*process(row))
    end

    router.emit_stream(tag, es)
  end
rescue
  log.error $!.to_s
  log.error_backtrace
end
encode(str, dst, src) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 136
def encode(str, dst, src)
  if str.nil?
    nil
  elsif src
    str.encode!(dst, src)
  else
    str.force_encoding(dst)
  end
end
extract_time(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 172
def extract_time(record)
  time = @keep_time_key ? record['start_time'] : record.delete('start_time')
  (time || Engine.now).to_i
end
on_timer() click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 82
def on_timer
  @servers.each.with_index do |(tag, server), i|
    emit_slow_log(tag, server)
  end
end
parse_encoding_param(encoding) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 60
def parse_encoding_param(encoding)
  Encoding.find(encoding)
rescue ArgumentError => e
  raise ConfigError, e.message
end
parse_timezone_param(timezone) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 45
def parse_timezone_param(timezone)
  TZInfo::Timezone.get(timezone)
rescue InvalidTimezoneIdentifier => e
  raise ConfigError, e.message
end
process(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 104
def process(record)
  process_timestamp(record)
  process_string(record)
  process_time(record)
  process_integer(record)
  [extract_time(record), record]
end
process_integer(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 158
def process_integer(record)
  %w[
    rows_sent
    rows_examined
    last_insert_id
    insert_id
    server_id
    thread_id
    rows_affected
  ].each do |field|
    record[field] &&= record[field].to_i
  end
end
process_string(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 122
def process_string(record)
  if @null_empty_string
    %w[user_host db sql_text].each do |field|
      record[field] = nil if (record[field] || '').empty?
    end
  end

  if @encoding
    encode(record['user_host'], @encoding, Encoding::UTF_8)
    encode(record['db'], @encoding, Encoding::UTF_8)
    encode(record['sql_text'], @encoding, @from_encoding)
  end
end
process_time(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 146
def process_time(record)
  %w[query_time lock_time].each do |field|
    record[field] &&= time_to_microseconds(record[field])
  end
end
process_timestamp(record) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 112
def process_timestamp(record)
  record['start_time'] &&= timestamp_to_time(record['start_time'])
end
run() click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 75
def run
  @loop.run
rescue
  log.error $!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 177
def shutdown
  super

  @loop.watchers.each(&:detach)
  @loop.stop
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 66
def start
  super

  @loop = Coolio::Loop.new
  @timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
  @loop.attach(@timer)
  @thread = Thread.new(&method(:run))
end
time_to_microseconds(time) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 152
def time_to_microseconds(time)
  hour, min, sec, usec = time.scanf('%4d:%2u:%2u.%6u')
  sign = hour < 0 ? -1 : 1
  (hour.abs * 3_600_000_000 + min * 60_000_000 + sec * 1_000_000 + usec.to_i) * sign
end
timestamp_to_time(timestamp) click to toggle source
# File lib/fluent/plugin/in_rds_mysql_slow_log.rb, line 116
def timestamp_to_time(timestamp)
  year, month, day, hour, min, sec, usec = timestamp.scanf('%4u-%2u-%2u %2u:%2u:%2u.%6u')
  t = Time.utc(year, month, day, hour, min, sec, usec)
  @database_timezone ? @database_timezone.local_to_utc(t) : t
end