class PgqPrometheus::Processor

Attributes

after_collect[RW]
before_collect[RW]
logger[RW]
on_error[RW]
sql_caller[RW]

Public Class Methods

new(labels = {}) click to toggle source
# File lib/pgq_prometheus/processor.rb, line 64
def initialize(labels = {})
  @metric_labels = labels || {}
end
running?() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 53
def running?
  defined?(@thread) && @thread
end
start(client: nil, frequency: 30, labels: nil) click to toggle source
# File lib/pgq_prometheus/processor.rb, line 14
def start(client: nil, frequency: 30, labels: nil)
  raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil?

  stop

  client ||= PrometheusExporter::Client.default
  metric_labels = labels&.dup || {}
  process_collector = new(metric_labels)

  @thread = Thread.new do
    wrap_thread_loop(name) do
      sql_caller.release_connection
      logger&.info { "Start #{name}" }
      while true
        begin
          before_collect&.call
          metrics = process_collector.collect
          metrics.each do |metric|
            client.send_json metric
          end
          after_collect&.call
        rescue => e
          STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}"
          logger&.error { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" }
          on_error&.call(e)
        end
        sleep frequency
      end
    end
  end

  true
end
stop() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 48
def stop
  @thread&.kill
  @thread = nil
end
wrap_thread_loop(*tags) { || ... } click to toggle source
# File lib/pgq_prometheus/processor.rb, line 57
def wrap_thread_loop(*tags)
  return yield if logger.nil? || !logger.respond_to?(:tagged)

  logger.tagged(*tags) { yield }
end

Public Instance Methods

collect() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 68
def collect
  metrics = []
  sql_caller.with_connection do

    sql_caller.queue_info.each do |queue_info|
      queue = queue_info[:queue_name]

      queue_metric_opts.each do |name, opts|
        value = opts[:apply].call(queue_info)
        labels = opts[:labels].merge(queue: queue)
        metrics << format_metric(name, value, labels)
      end

      sql_caller.consumer_info(queue).each do |consumer_info|
        consumer = consumer_info[:consumer_name]

        consumer_metric_opts.each do |name, opts|
          value = opts[:apply].call(consumer_info, queue_info)
          labels = opts[:labels].merge(queue: queue, consumer: consumer)
          metrics << format_metric(name, value, labels)
        end
      end
    end

    custom_metric_opts.each do |name, opts|
      value, labels = opts[:apply].call
      labels = (labels || {}).merge(opts[:labels])
      metrics << format_metric(name, value, labels)
    end
  end

  metrics
end

Private Instance Methods

consumer_metric_opts() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 112
def consumer_metric_opts
  Config._metrics.select { |_, opts| opts[:from] == :consumer }
end
custom_metric_opts() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 116
def custom_metric_opts
  Config._metrics.select { |_, opts| opts[:from].nil? }
end
format_metric(name, value, labels) click to toggle source
# File lib/pgq_prometheus/processor.rb, line 120
def format_metric(name, value, labels)
  {
      type: Config.type,
      name => value,
      metric_labels: labels.merge(@metric_labels)
  }
end
queue_metric_opts() click to toggle source
# File lib/pgq_prometheus/processor.rb, line 108
def queue_metric_opts
  Config._metrics.select { |_, opts| opts[:from] == :queue }
end