class Fluent::Plugin::PrometheusInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_prometheus.rb, line 35
def initialize
  super
  @registry = ::Prometheus::Client.registry
  @secure = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_prometheus.rb, line 41
def configure(conf)
  super

  # Get how many workers we have
  sysconf = if self.respond_to?(:owner) && owner.respond_to?(:system_config)
              owner.system_config
            elsif self.respond_to?(:system_config)
              self.system_config
            else
              nil
            end
  @num_workers = sysconf && sysconf.workers ? sysconf.workers : 1
  @secure = @transport_config.protocol == :tls || (@ssl && @ssl['enable'])

  @base_port = @port
  @port += fluentd_worker_id
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_prometheus.rb, line 59
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_prometheus.rb, line 115
def shutdown
  if @webrick_server
    @webrick_server.shutdown
    @webrick_server = nil
  end
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_prometheus.rb, line 63
def start
  super

  scheme = @secure ? 'https' : 'http'
  log.debug "listening prometheus http server on #{scheme}:://#{@bind}:#{@port}/#{@metrics_path} for worker#{fluentd_worker_id}"

  proto = @secure ? :tls : :tcp

  if @ssl && @ssl['enable'] && @ssl['extra_conf']
    start_webrick
    return
  end

  begin
    require 'async'
    require 'fluent/plugin/in_prometheus/async_wrapper'
    extend AsyncWrapper
  rescue LoadError => _
    # ignore
  end

  tls_opt = if @ssl && @ssl['enable']
              ssl_config = {}

              if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path'])
                raise Fluent::ConfigError.new('both certificate_path and private_key_path must be defined')
              end

              if @ssl['certificate_path']
                ssl_config['cert_path'] = @ssl['certificate_path']
              end

              if @ssl['private_key_path']
                ssl_config['private_key_path'] = @ssl['private_key_path']
              end

              if @ssl['ca_path']
                ssl_config['ca_path'] = @ssl['ca_path']
                # Only ca_path is insecure in fluentd
                # https://github.com/fluent/fluentd/blob/2236ad45197ba336fd9faf56f442252c8b226f25/lib/fluent/plugin_helper/cert_option.rb#L68
                ssl_config['insecure'] = true
              end

              ssl_config
            end

  http_server_create_http_server(:in_prometheus_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opt) do |server|
    server.get(@metrics_path) { |_req| all_metrics }
    server.get(@aggregated_metrics_path) { |_req| all_workers_metrics }
  end
end

Private Instance Methods

all_metrics() click to toggle source
# File lib/fluent/plugin/in_prometheus.rb, line 186
def all_metrics
  [200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, ::Prometheus::Client::Formats::Text.marshal(@registry)]
rescue => e
  [500, { 'Content-Type' => 'text/plain' }, e.to_s]
end
all_workers_metrics() click to toggle source
# File lib/fluent/plugin/in_prometheus.rb, line 192
def all_workers_metrics
  full_result = PromMetricsAggregator.new

  send_request_to_each_worker do |resp|
    if resp.code.to_s == '200'
      full_result.add_metrics(resp.body)
    end
  end

  [200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, full_result.get_metrics]
rescue => e
  [500, { 'Content-Type' => 'text/plain' }, e.to_s]
end
do_request(host:, port:, secure:) { |http| ... } click to toggle source

might be replaced by AsyncWrapper if async gem is installed

# File lib/fluent/plugin/in_prometheus.rb, line 216
def do_request(host:, port:, secure:)
  http = Net::HTTP.new(host, port)

  if secure
    http.use_ssl = true
    # target is our child process. so it's secure.
    http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  end

  http.start do
    yield(http)
  end
end
send_request_to_each_worker() { |get| ... } click to toggle source
# File lib/fluent/plugin/in_prometheus.rb, line 206
def send_request_to_each_worker
  bind = (@bind == '0.0.0.0') ? '127.0.0.1' : @bind
  [*(@base_port...(@base_port + @num_workers))].each do |worker_port|
    do_request(host: bind, port: worker_port, secure: @secure) do |http|
      yield(http.get(@metrics_path))
    end
  end
end
start_webrick() click to toggle source

For compatiblity because http helper can't support extra_conf option

# File lib/fluent/plugin/in_prometheus.rb, line 126
def start_webrick
  require 'webrick/https'
  require 'webrick'

  config = {
    BindAddress: @bind,
    Port: @port,
    MaxClients: 5,
    Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
    AccessLog: [],
  }
  if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path'])
    raise RuntimeError.new("certificate_path and private_key_path most both be defined")
  end

  ssl_config = {
    SSLEnable: true,
    SSLCertName: [['CN', 'nobody'], ['DC', 'example']]
  }

  if @ssl['certificate_path']
    cert = OpenSSL::X509::Certificate.new(File.read(@ssl['certificate_path']))
    ssl_config[:SSLCertificate] = cert
  end

  if @ssl['private_key_path']
    key = OpenSSL::PKey.read(@ssl['private_key_path'])
    ssl_config[:SSLPrivateKey] = key
  end

  ssl_config[:SSLCACertificateFile] = @ssl['ca_path'] if @ssl['ca_path']
  ssl_config = ssl_config.merge(@ssl['extra_conf']) if @ssl['extra_conf']
  config = ssl_config.merge(config)

  @log.on_debug do
    @log.debug("WEBrick conf: #{config}")
  end

  @webrick_server = WEBrick::HTTPServer.new(config)
  @webrick_server.mount_proc(@metrics_path) do |_req, res|
    status, header, body = all_metrics
    res.status = status
    res['Content-Type'] = header['Content-Type']
    res.body = body
    res
  end

  @webrick_server.mount_proc(@aggregated_metrics_path) do |_req, res|
    status, header, body = all_workers_metrics
    res.status = status
    res['Content-Type'] = header['Content-Type']
    res.body = body
    res
  end

  thread_create(:in_prometheus_webrick) do
    @webrick_server.start
  end
end