class PgqPrometheus::Processor
Attributes
after_collect[RW]
before_collect[RW]
logger[RW]
on_error[RW]
sql_caller[RW]
Public Class Methods
new(labels = {})
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 64 def initialize(labels = {}) @metric_labels = labels || {} end
running?()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 53 def running? defined?(@thread) && @thread end
start(client: nil, frequency: 30, labels: nil)
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 14 def start(client: nil, frequency: 30, labels: nil) raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil? stop client ||= PrometheusExporter::Client.default metric_labels = labels&.dup || {} process_collector = new(metric_labels) @thread = Thread.new do wrap_thread_loop(name) do sql_caller.release_connection logger&.info { "Start #{name}" } while true begin before_collect&.call metrics = process_collector.collect metrics.each do |metric| client.send_json metric end after_collect&.call rescue => e STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}" logger&.error { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" } on_error&.call(e) end sleep frequency end end end true end
stop()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 48 def stop @thread&.kill @thread = nil end
wrap_thread_loop(*tags) { || ... }
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 57 def wrap_thread_loop(*tags) return yield if logger.nil? || !logger.respond_to?(:tagged) logger.tagged(*tags) { yield } end
Public Instance Methods
collect()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 68 def collect metrics = [] sql_caller.with_connection do sql_caller.queue_info.each do |queue_info| queue = queue_info[:queue_name] queue_metric_opts.each do |name, opts| value = opts[:apply].call(queue_info) labels = opts[:labels].merge(queue: queue) metrics << format_metric(name, value, labels) end sql_caller.consumer_info(queue).each do |consumer_info| consumer = consumer_info[:consumer_name] consumer_metric_opts.each do |name, opts| value = opts[:apply].call(consumer_info, queue_info) labels = opts[:labels].merge(queue: queue, consumer: consumer) metrics << format_metric(name, value, labels) end end end custom_metric_opts.each do |name, opts| value, labels = opts[:apply].call labels = (labels || {}).merge(opts[:labels]) metrics << format_metric(name, value, labels) end end metrics end
Private Instance Methods
consumer_metric_opts()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 112 def consumer_metric_opts Config._metrics.select { |_, opts| opts[:from] == :consumer } end
custom_metric_opts()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 116 def custom_metric_opts Config._metrics.select { |_, opts| opts[:from].nil? } end
format_metric(name, value, labels)
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 120 def format_metric(name, value, labels) { type: Config.type, name => value, metric_labels: labels.merge(@metric_labels) } end
queue_metric_opts()
click to toggle source
# File lib/pgq_prometheus/processor.rb, line 108 def queue_metric_opts Config._metrics.select { |_, opts| opts[:from] == :queue } end