class Fluent::PgStatStatementsInput

Public Class Methods

desc(description) click to toggle source
# File lib/fluent/plugin/in_pg_stat.rb, line 11
def desc(description)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pg_stat.rb, line 33
def configure(conf)
  super

  @column_names = %w(
  calls total_time rows
  shared_blks_hit shared_blks_read shared_blks_dirtied shared_blks_written
  local_blks_hit local_blks_read local_blks_dirtied local_blks_written
  temp_blks_read temp_blks_written
  blk_read_time blk_write_time
  )
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_pg_stat.rb, line 58
def shutdown
  @stop_flag = true
  $log.debug 'Waiting for thread to finish'
  @thread.join
end
start() click to toggle source
# File lib/fluent/plugin/in_pg_stat.rb, line 45
def start
  @conn = PG.connect(
    host: @host,
    dbname: 'postgres',
    user: @username,
    password: @password
  )
  @conn.type_map_for_results = PG::BasicTypeMapForResults.new @conn

  @stop_flag = false
  @thread = Thread.new(&method(:thread_main))
end
thread_main() click to toggle source
# File lib/fluent/plugin/in_pg_stat.rb, line 64
    def thread_main
      until @stop_flag
        sleep 60

        begin
          me = MultiEventStream.new
          begin
            old_metrics = JSON.parse(IO.read(@state_file))
          rescue => e
            $log.info e
            old_metrics = {}
          end
          new_metrics = {}

          # More than one records of the same query string can exist
          # because queries longer than xxx bbytes are truncated.
          # Treat them as identical.
          now = Engine.now
          @conn.exec(<<-EOS
          select query, #{@column_names.map{|col| "cast (sum(#{col}) as double precision) as #{col}"}.join(', ')} from pg_stat_statements group by query order by total_time desc
          EOS
                    ).each do |row|
            new_metrics['timestamp'] = now.to_f
            query = row['query']
            new_metrics[query] = {}

            @column_names.each do |col|
              new_metrics[query][col] = row[col]
              row[col] = if !old_metrics[query].nil? && old_metrics[query][col].is_a?(Numeric)
                           val = (row[col] - old_metrics[query][col]) * 60 / (new_metrics['timestamp'] - old_metrics['timestamp'])
                           val >= 0 ? val : nil
                         end
            end

            # Calculate average values
            if row['calls'].is_a?(Numeric) && row['calls'] != 0
              row['avg_rows'] = row['rows'] / row['calls'] if row['rows'].is_a? Numeric
              row['avg_time'] = row['total_time'] / row['calls'] if row['total_time'].is_a? Numeric
            end

            me.add(now, row)
          end
          @router.emit_stream(@tag, me)
          IO.write(@state_file, new_metrics.to_json)

        rescue => e
          log.error 'unexpected error', error: e.message, error_class: e.class
          log.error_backtrace e.backtrace
        end
      end
    end