class Fluent::Plugin::KubeletMetadata

Constants

KUBELET_ERROR_BACKOFF_SECONDS
KUBELET_MAX_REQUESTS_PER_SECOND
POD_CACHE_SIZE
TAG_REGEX

rubocop:disable Layout/LineLength from github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/blob/8f95b0e5fda922ef0576c7ce53d0c72f19a86754/lib/fluent/plugin/filter_kubernetes_metadata.rb#L52 for example: input.kubernetes.pod.var.log.containers.fluentd-mgj9v_default_vault-pki-auth-manager-26f3a7bad715d9d324fb3c818681ec01df14831f0585cb83f2df25a7386ee5f4.log

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 52
def initialize
  super
  @cache = ThreadsafeLruCache.new(POD_CACHE_SIZE)
  @throttle_mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 58
def configure(conf)
  super
  @statsd = Object.const_get(@statsd) if @statsd
  fill_cache unless ARGV.include?('--dry-run')
end
filter(tag, _time, record) click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 64
def filter(tag, _time, record)
  return record unless match = tag.match(TAG_REGEX)&.named_captures

  labels = pod_labels(
    [match.fetch('namespace'), match.fetch('pod_name')],
    tags: [
      "pod_name:#{match.fetch('pod_name')}",
      "namespace:#{match.fetch('namespace')}",
      "container:#{match.fetch('container_name')}"
    ]
  )

  record.merge(
    'docker' => { 'container_id' => match.fetch('docker_id') },
    'kubernetes' => {
      'container_name' => match.fetch('container_name'),
      'namespace_name' => match.fetch('namespace'),
      'pod_name' => match.fetch('pod_name'),
      'labels' => labels
    }
  )
end

Private Instance Methods

fill_cache() click to toggle source

stores each pods labels that kubelet knows in the cache only storing the labels since pod objects are big

# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 102
def fill_cache
  pods.each do |pod|
    @cache[[pod.dig("metadata", "namespace"), pod.dig("metadata", "name")]] = pod.dig("metadata", "labels") || {}
  end
end
http_get(url) click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 123
def http_get(url)
  uri = URI(url)
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = (uri.scheme == "https")
  http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  http.open_timeout = 2
  http.read_timeout = 5

  request = Net::HTTP::Get.new(uri.request_uri)
  request['Authorization'] = "Bearer #{File.read("/var/run/secrets/kubernetes.io/serviceaccount/token")}"

  response = http.start { http.request request }

  raise "Error response #{response.code} -- #{response.body}" unless response.code == "200"

  response.body
end
inc(metric, **args) click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 171
def inc(metric, **args)
  @statsd&.increment "fluentd.kubelet_metadata.#{metric}", **args
end
pod_labels(key, **args) click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 89
def pod_labels(key, **args)
  @cache[key] || begin
    inc "soft_miss"
    fill_cache
    @cache[key] || begin
      inc "hard_miss", **args # need tags here to be able to debug
      {}
    end
  end
end
pods() click to toggle source

/runningpods/ has much less data, but does not include initContainerStatuses

InitContainers are often not available when logs start coming in

Full kubelet api see stackoverflow.com/questions/35075195/is-there-api-documentation-for-kubelet-api

# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 113
def pods
  retry_on_error backoff: KUBELET_ERROR_BACKOFF_SECONDS do
    throttle per_second: KUBELET_MAX_REQUESTS_PER_SECOND, throttled: [] do
      JSON.parse(http_get('https://localhost:10250/pods')).fetch("items")
    end
  end
rescue StandardError
  []
end
retry_on_error(backoff:) { || ... } click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 159
def retry_on_error(backoff:)
  yield
rescue StandardError
  backoff_index ||= -1
  backoff_index += 1
  inc "kubelet_error", tags: ["error:#{$!.class}"]
  raise unless delay = backoff[backoff_index]

  sleep delay
  retry
end
throttle(per_second:, throttled:) { || ... } click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 141
def throttle(per_second:, throttled:)
  t = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_i
  c = nil

  @throttle_mutex.synchronize do
    old_t, c = @throttle
    old_t == t ? c += 1 : c = 1
    @throttle = [t, c]
  end

  if c > per_second
    inc "throttled"
    throttled
  else
    yield
  end
end