class Fluent::MySQLStatusInput
Attributes
queries[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_status.rb, line 20 def initialize super @queries = [] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_status.rb, line 25 def configure(conf) super conf.elements.select {|element| element.name == 'query' }.each do |element| tag = element['tag'] or raise ConfigError, "Missing 'tag' parameter on <query> directive" string, default = configure_query_string(element) interval = element['interval'] || 10 omit_variable_name_from_record, clump_records = [ 'omit_variable_name_from_record', 'clump_records', ].map do |key| configure_query_format_flag(element, key, default) end clumped_records_key = element['clumped_records_key'] || 'records' @queries << { :tag => tag, :string => string, :interval => Integer(interval), :omit_variable_name_from_record => omit_variable_name_from_record, :clump_records => clump_records, :clumped_records_key => clumped_records_key, } end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 58 def shutdown @watcher.kill end
start()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 54 def start @watcher = Thread.new(&method(:watch)) end
watch()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 62 def watch client = ensure_connect() counter = generate_counter() loop do begin emit_queries(client, counter.next()) rescue => e $log.error e.message client = ensure_connect() end sleep 1 end end
Private Instance Methods
can_emit?(counter, interval)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 176 def can_emit?(counter, interval) return counter % interval == 0 end
configure_query_format_flag(conf, key, default)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 107 def configure_query_format_flag(conf, key, default) case conf[key] when nil default when 'true' true else false end end
configure_query_string(conf)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 78 def configure_query_string(conf) type = conf['type'] path = conf['path'] string = conf['string'] if type and path or type and string or path and string raise ConfigError, "'type', 'path' and 'string' parameter can't be defined together." end if type return case type when 'processlist' ['SHOW FULL PROCESSLIST', false] when 'open_tables' ['SHOW OPEN TABLES', false] when 'status' ['SHOW /*!50002 GLOBAL */ STATUS', true] else raise ConfigError, "Missing 'type' parameter on <query> directive" end elsif path return [File.read(path), false] elsif string return [string, false] else raise ConfigError, "Missing 'type' parameter on <query> directive" end end
emit(query, result)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 190 def emit(query, result) tag = make_tag(query[:tag]) time = Engine.now records = [] result.each do |row| record = {} row.each do |key, value| record[key] = format(value) end if query[:clump_records] records << record else router.emit(tag, time, record) end end if query[:clump_records] router.emit(tag, time, {query[:clumped_records_key] => records}) end end
emit_queries(client, counter)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 164 def emit_queries(client, counter) @queries.each do |query| next unless can_emit?(counter, query[:interval]) result = run_query(client, query[:string]) or next if query[:omit_variable_name_from_record] omit_and_emit(query, result) else emit(query, result) end end end
ensure_connect()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 118 def ensure_connect intervals = generate_intervals() begin return Mysql2::Client.new({ :host => @host, :port => @port, :username => @username, :password => @password, :database => @database, :encoding => @encoding, # :reconnect => true, }) rescue => e $log.error e.message sleep intervals.next() retry end end
format(value)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 239 def format(value) return Integer(value) rescue Float(value) rescue value end
generate_counter()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 151 def generate_counter return Enumerator.new do |yielder| counter = 0 loop do counter += 1 yielder << counter if 86400 <= counter counter = 0 end end end end
generate_intervals()
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 137 def generate_intervals return Enumerator.new do |yielder| temp, interval = 0, 1 loop do if 55 <= interval yielder << 60 else temp, interval = interval, temp + interval yielder << interval end end end end
make_tag(tag_suffix)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 235 def make_tag(tag_suffix) return [@tag, tag_suffix].join('.') end
omit_and_emit(query, result)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 213 def omit_and_emit(query, result) tag = make_tag(query[:tag]) time = Engine.now record = {} result.each do |row| key = row.fetch('Variable_name') value = format(row.fetch('Value')) if query[:clump_records] record[key] = value else emit_tag = [tag, key].join('.') router.emit(emit_tag, time, value) end end if query[:clump_records] router.emit(tag, time, record) end end
run_query(client, query)
click to toggle source
# File lib/fluent/plugin/in_mysql_status.rb, line 180 def run_query(client, query) begin return client.query("#{query} /* #{@comment} */", :cast => false, :cache_rows => false) rescue Mysql2::Error => e raise if [nil, 1040, 1053, 2002, 2003, 2006, 2013].include?(e.error_number) $log.error %Q(query: "#{query}" reason: "#{e.message}") return nil end end