class Fluent::Plugin::KubeletMetadata
Constants
- KUBELET_ERROR_BACKOFF_SECONDS
- KUBELET_MAX_REQUESTS_PER_SECOND
- POD_CACHE_SIZE
- TAG_REGEX
rubocop:disable Layout/LineLength from github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/blob/8f95b0e5fda922ef0576c7ce53d0c72f19a86754/lib/fluent/plugin/filter_kubernetes_metadata.rb#L52 for example: input.kubernetes.pod.var.log.containers.fluentd-mgj9v_default_vault-pki-auth-manager-26f3a7bad715d9d324fb3c818681ec01df14831f0585cb83f2df25a7386ee5f4.log
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 52 def initialize super @cache = ThreadsafeLruCache.new(POD_CACHE_SIZE) @throttle_mutex = Mutex.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 58 def configure(conf) super @statsd = Object.const_get(@statsd) if @statsd fill_cache unless ARGV.include?('--dry-run') end
filter(tag, _time, record)
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 64 def filter(tag, _time, record) return record unless match = tag.match(TAG_REGEX)&.named_captures labels = pod_labels( [match.fetch('namespace'), match.fetch('pod_name')], tags: [ "pod_name:#{match.fetch('pod_name')}", "namespace:#{match.fetch('namespace')}", "container:#{match.fetch('container_name')}" ] ) record.merge( 'docker' => { 'container_id' => match.fetch('docker_id') }, 'kubernetes' => { 'container_name' => match.fetch('container_name'), 'namespace_name' => match.fetch('namespace'), 'pod_name' => match.fetch('pod_name'), 'labels' => labels } ) end
Private Instance Methods
fill_cache()
click to toggle source
stores each pods labels that kubelet knows in the cache only storing the labels since pod objects are big
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 102 def fill_cache pods.each do |pod| @cache[[pod.dig("metadata", "namespace"), pod.dig("metadata", "name")]] = pod.dig("metadata", "labels") || {} end end
http_get(url)
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 123 def http_get(url) uri = URI(url) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = (uri.scheme == "https") http.verify_mode = OpenSSL::SSL::VERIFY_NONE http.open_timeout = 2 http.read_timeout = 5 request = Net::HTTP::Get.new(uri.request_uri) request['Authorization'] = "Bearer #{File.read("/var/run/secrets/kubernetes.io/serviceaccount/token")}" response = http.start { http.request request } raise "Error response #{response.code} -- #{response.body}" unless response.code == "200" response.body end
inc(metric, **args)
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 171 def inc(metric, **args) @statsd&.increment "fluentd.kubelet_metadata.#{metric}", **args end
pod_labels(key, **args)
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 89 def pod_labels(key, **args) @cache[key] || begin inc "soft_miss" fill_cache @cache[key] || begin inc "hard_miss", **args # need tags here to be able to debug {} end end end
pods()
click to toggle source
/runningpods/ has much less data, but does not include initContainerStatuses
InitContainers are often not available when logs start coming in
Full kubelet api see stackoverflow.com/questions/35075195/is-there-api-documentation-for-kubelet-api
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 113 def pods retry_on_error backoff: KUBELET_ERROR_BACKOFF_SECONDS do throttle per_second: KUBELET_MAX_REQUESTS_PER_SECOND, throttled: [] do JSON.parse(http_get('https://localhost:10250/pods')).fetch("items") end end rescue StandardError [] end
retry_on_error(backoff:) { || ... }
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 159 def retry_on_error(backoff:) yield rescue StandardError backoff_index ||= -1 backoff_index += 1 inc "kubelet_error", tags: ["error:#{$!.class}"] raise unless delay = backoff[backoff_index] sleep delay retry end
throttle(per_second:, throttled:) { || ... }
click to toggle source
# File lib/fluent/plugin/filter_kubelet_metadata.rb, line 141 def throttle(per_second:, throttled:) t = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_i c = nil @throttle_mutex.synchronize do old_t, c = @throttle old_t == t ? c += 1 : c = 1 @throttle = [t, c] end if c > per_second inc "throttled" throttled else yield end end