class PrometheusExporter::Client
Constants
- MAX_QUEUE_SIZE
- MAX_SOCKET_AGE
Attributes
logger[R]
Public Class Methods
default()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 46 def self.default @default ||= new end
default=(client)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 50 def self.default=(client) @default = client end
new( host: ENV.fetch('PROMETHEUS_EXPORTER_HOST', 'localhost'), port: ENV.fetch('PROMETHEUS_EXPORTER_PORT', PrometheusExporter::DEFAULT_PORT), max_queue_size: nil, thread_sleep: 0.5, json_serializer: nil, custom_labels: nil, logger: Logger.new(STDERR), log_level: Logger::WARN )
click to toggle source
# File lib/prometheus_exporter/client.rb, line 59 def initialize( host: ENV.fetch('PROMETHEUS_EXPORTER_HOST', 'localhost'), port: ENV.fetch('PROMETHEUS_EXPORTER_PORT', PrometheusExporter::DEFAULT_PORT), max_queue_size: nil, thread_sleep: 0.5, json_serializer: nil, custom_labels: nil, logger: Logger.new(STDERR), log_level: Logger::WARN ) @logger = logger @logger.level = log_level @metrics = [] @queue = Queue.new @socket = nil @socket_started = nil @socket_pid = nil max_queue_size ||= MAX_QUEUE_SIZE max_queue_size = max_queue_size.to_i if max_queue_size <= 0 raise ArgumentError, "max_queue_size must be larger than 0" end @max_queue_size = max_queue_size @host = host @port = port @worker_thread = nil @mutex = Mutex.new @thread_sleep = thread_sleep @json_serializer = json_serializer == :oj ? PrometheusExporter::OjCompat : JSON @custom_labels = custom_labels end
Public Instance Methods
custom_labels=(custom_labels)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 98 def custom_labels=(custom_labels) @custom_labels = custom_labels end
find_registered_metric(name, type: nil, help: nil)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 108 def find_registered_metric(name, type: nil, help: nil) @metrics.find do |metric| type_match = type ? metric.type == type : true help_match = help ? metric.help == help : true name_match = metric.name == name type_match && help_match && name_match end end
process_queue()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 142 def process_queue while @queue.length > 0 ensure_socket! begin message = @queue.pop @socket.write(message.bytesize.to_s(16).upcase) @socket.write("\r\n") @socket.write(message) @socket.write("\r\n") rescue => e logger.warn "Prometheus Exporter is dropping a message: #{e}" @socket = nil raise end end end
register(type, name, help, opts = nil)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 102 def register(type, name, help, opts = nil) metric = RemoteMetric.new(type: type, name: name, help: help, client: self, opts: opts) @metrics << metric metric end
send(str)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 132 def send(str) @queue << str if @queue.length > @max_queue_size logger.warn "Prometheus Exporter client is dropping message cause queue is full" @queue.pop end ensure_worker_thread! end
send_json(obj)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 118 def send_json(obj) payload = if @custom_labels if obj[:custom_labels] obj.merge(custom_labels: @custom_labels.merge(obj[:custom_labels])) else obj.merge(custom_labels: @custom_labels) end else obj end send(@json_serializer.dump(payload)) end
stop(wait_timeout_seconds: 0)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 160 def stop(wait_timeout_seconds: 0) @mutex.synchronize do wait_for_empty_queue_with_timeout(wait_timeout_seconds) @worker_thread&.kill while @worker_thread&.alive? sleep 0.001 end @worker_thread = nil close_socket! end end
Private Instance Methods
close_socket!()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 199 def close_socket! begin if @socket && !@socket.closed? @socket.write("0\r\n") @socket.write("\r\n") @socket.flush @socket.close end rescue Errno::EPIPE end @socket = nil @socket_started = nil end
close_socket_if_old!()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 214 def close_socket_if_old! if @socket_pid == Process.pid && @socket && @socket_started && ((@socket_started + MAX_SOCKET_AGE) < Time.now.to_f) close_socket! end end
ensure_socket!()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 220 def ensure_socket! # if process was forked socket may be owned by parent # leave it alone and reset if @socket_pid != Process.pid @socket = nil @socket_started = nil @socket_pid = nil end close_socket_if_old! if !@socket @socket = TCPSocket.new @host, @port @socket.write("POST /send-metrics HTTP/1.1\r\n") @socket.write("Transfer-Encoding: chunked\r\n") @socket.write("Host: #{@host}\r\n") @socket.write("Connection: Close\r\n") @socket.write("Content-Type: application/octet-stream\r\n") @socket.write("\r\n") @socket_started = Time.now.to_f @socket_pid = Process.pid end nil rescue @socket = nil @socket_started = nil @socket_pid = nil raise end
ensure_worker_thread!()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 181 def ensure_worker_thread! unless @worker_thread&.alive? @mutex.synchronize do return if @worker_thread&.alive? @worker_thread = Thread.new do while true worker_loop sleep @thread_sleep end end end end rescue ThreadError => e raise unless e.message =~ /can't alloc thread/ logger.error "Prometheus Exporter, failed to send message ThreadError #{e}" end
wait_for_empty_queue_with_timeout(timeout_seconds)
click to toggle source
# File lib/prometheus_exporter/client.rb, line 250 def wait_for_empty_queue_with_timeout(timeout_seconds) start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) while @queue.length > 0 break if start_time + timeout_seconds < ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) sleep(0.05) end end
worker_loop()
click to toggle source
# File lib/prometheus_exporter/client.rb, line 174 def worker_loop close_socket_if_old! process_queue rescue => e logger.error "Prometheus Exporter, failed to send message #{e}" end