class Fluent::Plugin::KubernetesMetricsAggregatorInput

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 166
def close
  @watchers.each &:finish if @watchers

  super
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 149
def configure(conf)
  super
  @mutex_node_req_lim = Mutex.new
  @mutex_node_res_usage = Mutex.new
  parse_tag
  initialize_client
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 157
def start
  super

  timer_execute :limits_request_scraper, @interval, &method(:scrape_limits_requests_metrics)
  timer_execute :node_scraper, @interval, &method(:scrape_node_metrics)
  timer_execute :resource_usage_scraper, @interval, &method(:scrape_resource_usage_metrics)

end

Private Instance Methods

emit_limits_requests_metrics(tag, scraped_at, labels, limits_requests_metric) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 321
def emit_limits_requests_metrics(tag,
                                 scraped_at,
                                 labels,
                                 limits_requests_metric)
  router.emit tag + '.cpu.limit',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge(
                'value' => limits_requests_metric.instance_variable_get(:@cpu_limit)
              )
  router.emit tag + '.cpu.request',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge('value' => limits_requests_metric.instance_variable_get(:@cpu_request))
  router.emit tag + '.memory.limit',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge('value' => limits_requests_metric.instance_variable_get(:@memory_limit))
  router.emit tag + '.memory.request',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge('value' => limits_requests_metric.instance_variable_get(:@memory_request))
end
emit_resource_usage_metrics(tag, scraped_at, labels, resource_usage_metric) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 341
def emit_resource_usage_metrics(tag,
                                scraped_at,
                                labels,
                                resource_usage_metric)
  router.emit tag + '.cpu.usage',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge('value' => resource_usage_metric.instance_variable_get(:@cpu_usage))
  router.emit tag + '.memory.usage',
              Fluent::EventTime.from_time(scraped_at),
              labels.merge('value' => resource_usage_metric.instance_variable_get(:@memory_usage))
end
generate_tag(item_name) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 178
def generate_tag(item_name)
  return @tag unless @tag_prefix

  [@tag_prefix, item_name, @tag_suffix].join
end
get_cpu_mult(cpu) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 274
def get_cpu_mult(cpu)
  cpu_mult = 1
  cpu_mult = 1000 if cpu[-1] != 'm'
  cpu_mult
end
get_cpu_value(resource) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 280
def get_cpu_value(resource)
  cpu_val = resource.tr('^0-9', '').to_i
  mult = get_cpu_mult(resource)
  cpu_val += cpu_val * mult
  cpu_val
end
get_memory_mult(memory) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 287
def get_memory_mult(memory)
  memory_mult = if memory[-2] == 'Ki'
                  0.001
                elsif memory[-2] == 'K'
                  1.0 / 1024
                elsif memory[-2] == 'Mi'
                  1
                elsif memory[-2] == 'M'
                  1
                elsif memory[-2] == 'Gi'
                  1000
                elsif memory[-2] == 'G'
                  1024
                elsif memory[-2] == 'Ti'
                  1_000_000
                elsif memory[-2] == 'T'
                  1_048_576 # 1024*1024
                elsif memory[-2] == 'Ei'
                  1_000_000_000
                elsif memory[-2] == 'E'
                  1_073_741_824 # 1024*1024*1024
                else
                  0.000001
                end
  memory_mult
end
get_memory_value(resource) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 314
def get_memory_value(resource)
  mem_val = resource.tr('^0-9', '').to_i
  mult = get_memory_mult(resource)
  mem_val += mem_val * mult
  mem_val
end
handle_limits_requests_res(response) click to toggle source

This method is used to handle responses from the kube apiserver api

# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 372
def handle_limits_requests_res(response)
  # Checking response codes only for a successful GET request viz., 2XX codes
  if (response.code < 300) && (response.code > 199)
    @scraped_at = Time.now
    process_limits_requests_res MultiJson.load(response.body)
  else
    log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}"
  end
rescue StandardError => e
  log.error "Failed to scrape limit metrics, error=#{$ERROR_INFO}, #{e.inspect}"
  log.error_backtrace
end
handle_node_response(response) click to toggle source

This method is used to handle responses from the kubeapiserver api

# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 474
def handle_node_response(response)
  # Checking response codes only for a successful GET request viz., 2XX codes
  if (response.code < 300) && (response.code > 199)
    @scraped_node_at = Time.now
    process_node_response MultiJson.load(response.body)
  else
    log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}"
  end
rescue StandardError => e
  log.error "Failed to scrape node metrics, error=#{$ERROR_INFO}, #{e.inspect}"
  log.error_backtrace
end
handle_resource_usage_response(response) click to toggle source

This method is used to handle responses from the kubelet summary api

# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 550
def handle_resource_usage_response(response)
  # Checking response codes only for a successful GET request viz., 2XX codes
  if (response.code < 300) && (response.code > 199)
    @scraped_at = Time.now
    process_resource_usage_res MultiJson.load(response.body)
  else
    log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}"
  end
rescue StandardError => e
  log.error "Failed to scrape resource usage metrics, error=#{$ERROR_INFO}, #{e.inspect}"
  log.error_backtrace
end
init_with_kubeconfig(options = {}) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 184
def init_with_kubeconfig(options = {})
  config = Kubeclient::Config.read @kubeconfig
  current_context = config.context

  @client = Kubeclient::Client.new(
    current_context.api_endpoint,
    current_context.api_version,
    options.merge(
      ssl_options: current_context.ssl_options,
      auth_options: current_context.auth_options
    )
  )
end
init_without_kubeconfig(_options = {}) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 198
def init_without_kubeconfig(_options = {})
  kubernetes_url_final = nil
  # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin
  if @kubernetes_url.nil?
    # Use Kubernetes default service account if we're in a pod.
    env_host = ENV['KUBERNETES_SERVICE_HOST']
    env_port = ENV['KUBERNETES_SERVICE_PORT']
    if env_host && env_port
      kubernetes_url_final = "https://#{env_host}:#{env_port}/api/"
    end
  else
    kubernetes_url_final = "https://#{@kubernetes_url}:#{@kubelet_port}/api/"
  end

  raise Fluent::ConfigError, 'kubernetes url is not set in configuration, or environment variables' unless kubernetes_url_final

  # Use SSL certificate and bearer token from Kubernetes service account.
  if Dir.exist?(@secret_dir)
    secret_ca_file = File.join(@secret_dir, 'ca.crt')
    secret_token_file = File.join(@secret_dir, 'token')

    if @ca_file.nil? && File.exist?(secret_ca_file)
      @ca_file = secret_ca_file
    end

    if @bearer_token_file.nil? && File.exist?(secret_token_file)
      @bearer_token_file = secret_token_file
    end
  end

  ssl_options = {
    client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)),
    client_key:  @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)),
    ca_file:     @ca_file,
    verify_ssl:  @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
  }

  auth_options = {}
  auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file

  @client = Kubeclient::Client.new(
      kubernetes_url_final, 'v1',
    ssl_options: ssl_options,
    auth_options: auth_options
  )

  begin
    @client.api_valid?
  rescue KubeException => kube_error
    raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{kubernetes_url_final}: #{kube_error.message}"
  end
end
initialize_client() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 251
def initialize_client
  options = {
    timeouts: {
      open: 10,
      read: nil
    }
  }

  if @kubeconfig.nil?
    init_without_kubeconfig options
  else
    init_with_kubeconfig options
  end
end
limits_requests_api() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 353
def limits_requests_api
  @limits_requests_api =
    begin
      @client.discover unless @client.discovered
      @client.rest_client['/pods'].tap do |endpoint|
        log.debug("Use URL #{endpoint.url} for scraping limits requests metrics")
      end
    end
end
node_api() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 455
def node_api
  @node_api =
    begin
      @client.discover unless @client.discovered
      @client.rest_client['/nodes'].tap do |endpoint|
        log.debug("Use URL #{endpoint.url} for scraping node metrics")
      end
    end
end
parse_tag() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 174
def parse_tag
  @tag_prefix, @tag_suffix = @tag.split('*') if @tag.include?('*')
end
parse_time(metric_time) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 266
def parse_time(metric_time)
  Fluent::EventTime.from_time Time.iso8601(metric_time)
end
process_limits_requests_res(response) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 385
def process_limits_requests_res(response)
  @scraped_at = Time.now
  @mutex_node_req_lim.synchronize do
    Array(response['items']).each do |pod_json|
      pod_namespace = pod_json['metadata']['namespace']
      pod_node_name = pod_json['spec']['nodeName']
      if @@namespace_usage_metrics_map[pod_namespace].nil?
        namespace_usage_metrics = UsageMetricsUnit.new
        @@namespace_usage_metrics_map[pod_namespace] = namespace_usage_metrics
      end

      pod_containers = pod_json['spec']['containers']
      pod_usage_metrics = UsageMetricsUnit.new
      Array(pod_containers).each do |container_json|
        container_usage_metrics = UsageMetricsUnit.new
        cpu_limit = '0'
        memory_limit = '0'
        cpu_request = '0'
        memory_request = '0'
        unless container_json['resources']['limits'].nil?
          unless container_json['resources']['limits']['cpu'].nil?
            cpu_limit = container_json['resources']['limits']['cpu']
          end
          unless container_json['resources']['limits']['memory'].nil?
            memory_limit = container_json['resources']['limits']['memory']
          end
        end
        unless container_json['resources']['requests'].nil?
          unless container_json['resources']['requests']['cpu'].nil?
            cpu_request = container_json['resources']['requests']['cpu']
          end
          unless container_json['resources']['requests']['memory'].nil?
            memory_request = container_json['resources']['requests']['memory']
          end
        end
        container_usage_metrics.add_usage_metrics(cpu_limit, cpu_request, memory_limit, memory_request)
        container_labels = { 'name' => container_json['name'], 'image' => container_json['image'], 'node' => pod_json['spec']['nodeName'] }
        emit_limits_requests_metrics(generate_tag('container'), @scraped_at, container_labels, container_usage_metrics)
        pod_usage_metrics.add_usage_metrics(cpu_limit, cpu_request, memory_limit, memory_request)
      end

      pod_labels = { 'name' => pod_json['metadata']['name'], 'namespace' => pod_json['metadata']['namespace'], 'node' => pod_json['spec']['nodeName'] }
      emit_limits_requests_metrics(generate_tag('pod'), @scraped_at, pod_labels, pod_usage_metrics)
      @@namespace_usage_metrics_map[pod_namespace].add_usage_metrics(pod_usage_metrics.instance_variable_get(:@cpu_limit).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@cpu_request).to_s + ('m'),
                                                                     pod_usage_metrics.instance_variable_get(:@memory_limit).to_s + ('Mi'), pod_usage_metrics.instance_variable_get(:@memory_request).to_s + ('Mi'))

      if @@node_requests_limits_metrics_map[pod_node_name].nil?
        node_name_usage_metrics = UsageMetricsUnit.new
        @@node_requests_limits_metrics_map[pod_node_name] = node_name_usage_metrics
      end
      @@node_requests_limits_metrics_map[pod_node_name].add_usage_metrics(pod_usage_metrics.instance_variable_get(:@cpu_limit).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@cpu_request).to_s + ('m'),
                                                                          pod_usage_metrics.instance_variable_get(:@memory_limit).to_s + ('Mi'), pod_usage_metrics.instance_variable_get(:@memory_request).to_s + ('Mi'))
      pod_usage_metrics = nil
    end
  end
  cluster_usage_metrics = UsageMetricsUnit.new
  @@namespace_usage_metrics_map.each do |key, value|
    cluster_usage_metrics.add_usage_metrics(value.instance_variable_get(:@cpu_limit).to_s + ('m'), value.instance_variable_get(:@cpu_request).to_s + ('m'),
                                            value.instance_variable_get(:@memory_limit).to_s + ('Mi'), value.instance_variable_get(:@memory_request).to_s + ('Mi'))
    emit_limits_requests_metrics(generate_tag('namespace'), @scraped_at, { 'name' => key }, value)
    value = nil
  end

  emit_limits_requests_metrics(generate_tag('cluster'), @scraped_at, { 'name' => @cluster_name }, cluster_usage_metrics)
  cluster_usage_metrics = nil

  @@namespace_usage_metrics_map = nil
  @@namespace_usage_metrics_map = {}
end
process_node_response(response) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 487
def process_node_response(response)
  Array(response['items']).each do |node_json|
    node_name = node_json['metadata']['name']
    node_cpu_capacity = get_cpu_value(node_json['status']['capacity']['cpu'])
    router.emit generate_tag('node') << ('.cpu.capacity'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_capacity
    node_cpu_allocatable = get_cpu_value(node_json['status']['allocatable']['cpu'])
    router.emit generate_tag('node') << ('.cpu.allocatable'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_allocatable
    node_memory_capacity = get_memory_value(node_json['status']['capacity']['memory'])
    router.emit generate_tag('node') << ('.memory.capacity'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_capacity
    node_memory_allocatable = get_memory_value(node_json['status']['allocatable']['memory'])
    router.emit generate_tag('node') << ('.memory.allocatable'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_allocatable

    node_req_lim = UsageMetricsUnit.new
    node_res_usage = ResourceUsageMetricsUnit.new
    @mutex_node_req_lim.synchronize do
      next if @@node_requests_limits_metrics_map[node_name].nil?

      node_req_lim = @@node_requests_limits_metrics_map[node_name]
    end
    @mutex_node_res_usage.synchronize do
      next if @@node_resource_usage_metrics_map[node_name].nil?

      node_res_usage = @@node_resource_usage_metrics_map[node_name]
    end
    # https://github.com/kubernetes/heapster/blob/c78cc312ab3901acfe5c2f95f7a621909c8455ad/metrics/processors/node_autoscaling_enricher.go#L62
    node_cpu_utilization = node_res_usage.instance_variable_get(:@cpu_usage).to_f / 1_000_000 * node_cpu_allocatable # converting from nano cores to milli core
    router.emit generate_tag('node') << ('.cpu.utilization'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_utilization
    node_cpu_reservation = node_req_lim.instance_variable_get(:@cpu_request).to_f / node_cpu_allocatable
    router.emit generate_tag('node') << ('.cpu.reservation'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_reservation
    node_memory_utilization = node_res_usage.instance_variable_get(:@memory_usage).to_f / 1_000_000 * node_memory_allocatable # converting from bytes to megabytes
    router.emit generate_tag('node') << ('.memory.utilization'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_utilization
    node_memory_reservation = node_req_lim.instance_variable_get(:@memory_request).to_f / node_memory_allocatable
    router.emit generate_tag('node') << ('.memory.reservation'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_reservation
    @mutex_node_req_lim.synchronize do
      @@node_requests_limits_metrics_map = nil
      @@node_requests_limits_metrics_map = {}
    end
    @mutex_node_res_usage.synchronize do
      @@node_resource_usage_metrics_map = nil
      @@node_resource_usage_metrics_map = {}
    end
  end
end
process_resource_usage_res(response) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 563
def process_resource_usage_res(response)
  @scraped_node_at = Time.now
  @mutex_node_res_usage.synchronize do
    Array(response['items']).each do |node_json|
      node_name = node_json['metadata']['name']
      node_rest_client =
        begin
          @client.discover unless @client.discovered
          @client.rest_client["/nodes/#{node_name}:#{@kubelet_port}/proxy/stats/summary"].tap do |endpoint|
            log.debug("Use URL #{endpoint.url} for scraping resource usage metrics")
          end
        end

      node_response = JSON.parse(node_rest_client.get(@client.headers))
      Array(node_response['pods']).each do |pod_json|
        unless pod_json['cpu'].nil? || pod_json['memory'].nil?
          pod_cpu_usage = pod_json['cpu'].fetch('usageNanoCores', 0)/ 1_000_000
          pod_memory_usage = pod_json['memory'].fetch('usageBytes', 0)
          pod_namespace = pod_json['podRef']['namespace']
          pod_usage = ResourceUsageMetricsUnit.new
          pod_usage.add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage)
          if @@namespace_resource_usage_metrics_map[pod_namespace].nil?
            namespace_usage_metrics = ResourceUsageMetricsUnit.new
            @@namespace_resource_usage_metrics_map[pod_namespace] = pod_usage
          else
            @@namespace_resource_usage_metrics_map[pod_namespace].add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage)
          end
          if @@node_resource_usage_metrics_map[node_name].nil?
            node_name_usage_metrics = ResourceUsageMetricsUnit.new
            @@node_resource_usage_metrics_map[node_name] = node_name_usage_metrics
          end
          @@node_resource_usage_metrics_map[node_name].add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage)
          pod_usage = nil
        end
      end
    end
  end

  cluster_usage_metrics = ResourceUsageMetricsUnit.new
  @@namespace_resource_usage_metrics_map.each do |key, value|
    cluster_usage_metrics.add_resource_usage_metrics(value.instance_variable_get(:@cpu_usage), value.instance_variable_get(:@memory_usage))
    emit_resource_usage_metrics(generate_tag('namespace'), @scraped_at, { 'name' => key }, value)
    value = nil
  end
  emit_resource_usage_metrics(generate_tag('cluster'), @scraped_at, { 'name' => @cluster_name }, cluster_usage_metrics)
  cluster_usage_metrics = nil

  @@namespace_resource_usage_metrics_map = nil
  @@namespace_resource_usage_metrics_map = {}
end
resource_usage_api() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 531
def resource_usage_api
  @resource_usage_api =
    begin
      @client.discover unless @client.discovered
      @client.rest_client['/nodes'].tap do |endpoint|
        log.debug("Use URL #{endpoint.url} for scraping node metrics")
      end
    end
end
scrape_limits_requests_metrics() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 363
def scrape_limits_requests_metrics
  response = limits_requests_api.get(@client.headers)
  handle_limits_requests_res(response)
rescue StandardError => e
  log.error "Failed to get limit metrics, error=#{$ERROR_INFO}, #{e.inspect}"
  log.error_backtrace
end
scrape_node_metrics() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 465
def scrape_node_metrics
  response = node_api.get(@client.headers)
  handle_node_response(response)
rescue StandardError => e
  log.error "Failed to get node metrics, error=#{$ERROR_INFO}, #{e.inspect}"
  log.error_backtrace
end
scrape_resource_usage_metrics() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 541
def scrape_resource_usage_metrics
  response = resource_usage_api.get(@client.headers)
  handle_resource_usage_response(response)
 rescue StandardError => e
   log.error "Failed to get resource usage metrics, error=#{$ERROR_INFO}, #{e.inspect}"
   log.error_backtrace
end
underscore(camlcase) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics_aggregator.rb, line 270
def underscore(camlcase)
  camlcase.gsub(/[A-Z]/) { |c| "_#{c.downcase}" }
end