class Fluent::Plugin::KubernetesMetricsInput
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 102 def close @watchers.each &:finish if @watchers super end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 76 def configure(conf) super if @use_rest_client raise Fluentd::ConfigError, 'node_name is required' if @node_name.nil? || @node_name.empty? else raise Fluentd::ConfigError, 'node_names is required' if @node_names.nil? || @node_names.empty? end parse_tag initialize_client end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 89 def start super timer_execute :metric_scraper, @interval, &method(:scrape_metrics) timer_execute :cadvisor_metric_scraper, @interval, &method(:scrape_cadvisor_metrics) # It is done to optionally fetch from 'stats' for k8s version <1.21 if is_stats_endpoint_available? timer_execute :stats_metric_scraper, @interval, &method(:scrape_stats_metrics) else log.info "'/stats' endpoint is not available. It has been deprecated since k8s v1.15, disabled since v1.18, and removed in v1.21 and onwards" end end
Private Instance Methods
cadvisor_proxy_api(node)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 289 def cadvisor_proxy_api(node) @cadvisor_api = begin @client.discover unless @client.discovered @client.rest_client["/nodes/#{node}:#{@kubelet_port}/proxy/metrics/cadvisor"].tap do |endpoint| log.info("Use URL #{endpoint.url} for scraping metrics") end end end
cadvisor_request_options()
click to toggle source
This method is used to set the options for sending a request to the cadvisor api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 262 def cadvisor_request_options options = { method: 'get', url: @cadvisor_url } options = options.merge(set_ssl_options) options end
emit_cadvisor_metrics(metrics)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 647 def emit_cadvisor_metrics(metrics) metrics = metrics.split("\n") metrics.each do |metric| next if metric[0] == '#' or not container_name = grep_using_regex(metric, /container(?:_name)?="([^"]*)"/) next if container_name.empty? metric_str, metric_val = metric.split(' ') metric_val = metric_val.to_f if metric_val.is_a? String first_occur = metric_str.index('{') metric_name = metric_str[0..first_occur - 1] pod_name = grep_using_regex(metric, /pod(?:_name)?="([^"]*)"/).to_s image_name = grep_using_regex(metric, /image="([^"]*)"/).to_s namespace = grep_using_regex(metric, /namespace="([^"]*)"/).to_s metric_labels = { 'pod_name' => pod_name, 'image' => image_name, 'namespace' => namespace, 'value' => metric_val, 'node' => @node_name } if container_name=="POD" tag = 'pod' tag = generate_tag("#{tag}#{metric_name.tr('_', '.')}") tag = tag.gsub('container', '') else container_label = { 'container_name' => container_name } metric_labels.merge!(container_label) tag = generate_tag(metric_name.tr('_', '.').to_s) end router.emit tag, @scraped_at_cadvisor, metric_labels end end
emit_container_metrics(pod_labels, container)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 560 def emit_container_metrics(pod_labels, container) tag = 'container' labels = pod_labels.merge 'container-name' => container['name'] unless container['startTime'].nil? emit_uptime tag: tag, start_time: container['startTime'], labels: labels emit_cpu_metrics tag: tag, metrics: container['cpu'], labels: labels unless container['cpu'].nil? emit_memory_metrics tag: tag, metrics: container['memory'], labels: labels unless container['memory'].nil? emit_fs_metrics tag: "#{tag}.rootfs", metrics: container['rootfs'], labels: labels unless container['rootfs'].nil? emit_fs_metrics tag: "#{tag}.logs", metrics: container['logs'], labels: labels unless container['logs'].nil? end end
emit_cpu_metrics(tag:, metrics:, labels:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 314 def emit_cpu_metrics(tag:, metrics:, labels:) unless metrics['time'].nil? time = parse_time metrics['time'] if usage_rate = metrics['usageNanoCores'] router.emit generate_tag("#{tag}.cpu.usage_rate"), time, labels.merge('value' => usage_rate / 1_000_000.0) end if usage = metrics['usageNanoCores'] router.emit generate_tag("#{tag}.cpu.usage"), time, labels.merge('value' => usage) end end end
emit_cpu_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 417 def emit_cpu_metrics_stats(tag:, metrics:, labels:, time:) if cpu_usage_total = metrics['usage']['total'] router.emit generate_tag("#{tag}.cpu.usage.total"), time, labels.merge('value' => cpu_usage_total / 1_000_000.0) end if cpu_usage_user = metrics['usage']['user'] router.emit generate_tag("#{tag}.cpu.usage.user"), time, labels.merge('value' => cpu_usage_user / 1_000_000.0) end if cpu_usage_system = metrics['usage']['system'] router.emit generate_tag("#{tag}.cpu.usage.system"), time, labels.merge('value' => cpu_usage_system / 1_000_000.0) end if cpu_cfs_periods = metrics['cfs']['periods'] router.emit generate_tag("#{tag}.cpu.cfs.periods"), time, labels.merge('value' => cpu_cfs_periods) end if cpu_cfs_throttled_periods = metrics['cfs']['throttled_periods'] router.emit generate_tag("#{tag}.cpu.cfs.throttled_periods"), time, labels.merge('value' => cpu_cfs_throttled_periods) end if cpu_cfs_throttled_time = metrics['cfs']['throttled_time'] router.emit generate_tag("#{tag}.cpu.cfs.throttled_time"), time, labels.merge('value' => cpu_cfs_throttled_time) end if cpu_load_average = metrics['load_average'] router.emit generate_tag("#{tag}.cpu.load_average"), time, labels.merge('value' => cpu_load_average) end end
emit_diskio_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 442 def emit_diskio_metrics_stats(tag:, metrics:, labels:, time:) %w[io_service_bytes io_serviced io_queued sectors io_service_time io_wait_time io_merged io_time].each do |metric_name| next unless current_io_metric = metrics[metric_name] current_io_metric.each do |device| if diskio_io_service_bytes_major = device['major'] router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat('.major.')), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_major) end if diskio_io_service_bytes_minor = device['minor'] router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat('.minor.')), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_minor) end device_stats = device['stats'] unless device['stats'].nil? device_stats.each do |device_stat| device_key, device_value = device_stat unless device_stat.nil? router.emit generate_tag("#{tag}.diskio.".concat(metric_name).concat('.stats.').concat(device_key)), time, labels.merge('device' => device['device'], 'value' => device_value) end end end end
emit_filesystem_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 507 def emit_filesystem_metrics_stats(tag:, metrics:, labels:, time:) metrics.each do |file_system| device = file_system['device'] type = file_system['type'] file_system.each do |file_metric| file_key, file_value = file_metric unless file_metric.nil? unless %w[device type has_inodes].include? file_key router.emit generate_tag("#{tag}.filesystem.".concat(file_key)), time, labels.merge('device' => device, 'type' => type, 'value' => file_value) end end end end
emit_fs_metrics(tag:, metrics:, labels:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 351 def emit_fs_metrics(tag:, metrics:, labels:) unless metrics['time'].nil? time = parse_time metrics['time'] %w[availableBytes capacityBytes usedBytes inodesFree inodes inodesUsed].each do |metric_name| if value = metrics[metric_name] router.emit generate_tag("#{tag}.#{underscore metric_name}"), time, labels.merge('value' => value) end end end end
emit_memory_metrics(tag:, metrics:, labels:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 326 def emit_memory_metrics(tag:, metrics:, labels:) unless metrics['time'].nil? time = parse_time metrics['time'] %w[availableBytes usageBytes workingSetBytes rssBytes pageFaults majorPageFaults].each do |name| if value = metrics[name] router.emit generate_tag("#{tag}.memory.#{underscore name}"), time, labels.merge('value' => value) end end end end
emit_memory_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 462 def emit_memory_metrics_stats(tag:, metrics:, labels:, time:) %w[usage max_usage cache rss swap working_set failcnt].each do |metric_name| if current_memory_metric = metrics[metric_name] router.emit generate_tag("#{tag}.memory.".concat(metric_name)), time, labels.merge('value' => current_memory_metric) end end %w[container_data hierarchical_data].each do |metric_name_group| next unless current_memory_metric_group = metrics[metric_name_group] current_memory_metric_group.each do |metric_name| metric_key, metric_value = metric_name unless metric_name.nil? router.emit generate_tag("#{tag}.memory.".concat(metric_name_group).concat('.').concat(metric_key)), time, labels.merge('value' => metric_value) end end end
emit_metrics(metrics)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 630 def emit_metrics(metrics) emit_node_metrics(metrics['node']) unless metrics['node'].nil? log.warn "Summary API received empty pods info" if (metrics['pods'].nil? or metrics['pods'].empty?) Array(metrics['pods']).each &method(:emit_pod_metrics).curry.call(metrics['node']['nodeName']) unless metrics['pods'].nil? end
emit_network_metrics(tag:, metrics:, labels:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 337 def emit_network_metrics(tag:, metrics:, labels:) unless metrics['time'].nil? time = parse_time metrics['time'] Array(metrics['interfaces']).each do |it| it_name = it['name'] %w[rxBytes rxErrors txBytes txErrors].each do |metric_name| if value = it[metric_name] router.emit generate_tag("#{tag}.network.#{underscore metric_name}"), time, labels.merge('value' => value, 'interface' => it_name) end end end end end
emit_network_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 478 def emit_network_metrics_stats(tag:, metrics:, labels:, time:) network_name = metrics['name'] %w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do |metric_name| if current_network_metric = metrics[metric_name] router.emit generate_tag("#{tag}.network.".concat(network_name).concat('.').concat(metric_name)), time, labels.merge('value' => current_network_metric) end end if network_interfaces = metrics['interfaces'] network_interfaces.each do |current_interface| name = current_interface['name'] %w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do |current_metric| if metric_value = current_interface[current_metric] router.emit generate_tag("#{tag}.network.".concat(name).concat('.').concat(current_metric)), time, labels.merge('value' => metric_value) end end end end %w[tcp tcp6 udp udp6].each do |metric_name_group| next unless metric_group = metrics[metric_name_group] metric_group.each do |current_metric| metric_key, metric_value = current_metric unless current_metric.nil? router.emit generate_tag("#{tag}.network.".concat(metric_name_group).concat('.').concat(metric_key)), time, labels.merge('value' => metric_value) end end end
emit_node_metrics(node)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 527 def emit_node_metrics(node) node_name = node['nodeName'] tag = 'node' labels = { 'node' => node_name } unless node['startTime'].nil? emit_uptime tag: tag, start_time: node['startTime'], labels: labels unless node['cpu'].nil? emit_cpu_metrics tag: tag, metrics: node['cpu'], labels: labels end unless node['memory'].nil? emit_memory_metrics tag: tag, metrics: node['memory'], labels: labels end unless node['network'].nil? emit_network_metrics tag: tag, metrics: node['network'], labels: labels end unless node['fs'].nil? emit_fs_metrics tag: "#{tag}.fs", metrics: node['fs'], labels: labels end unless node['runtime']['imageFs'].nil? emit_fs_metrics tag: "#{tag}.imagefs", metrics: node['runtime']['imageFs'], labels: labels end unless node['rlimit'].nil? emit_node_rlimit_metrics node_name, node['rlimit'] end unless node['systemContainers'].nil? node['systemContainers'].each do |c| emit_system_container_metrics node_name, c unless c.nil? end end end end
emit_node_rlimit_metrics(node_name, rlimit)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 362 def emit_node_rlimit_metrics(node_name, rlimit) unless rlimit['time'].nil? time = parse_time rlimit['time'] %w[maxpid curproc].each do |metric_name| next unless value = rlimit[metric_name] router.emit(generate_tag("node.runtime.imagefs.#{metric_name}"), time, 'value' => value, 'node' => node_name) end end end
emit_pod_metrics(node_name, pod)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 572 def emit_pod_metrics(node_name, pod) tag = 'pod' labels = pod['podRef'].transform_keys &'pod-'.method(:+) labels['node'] = node_name unless pod['startTime'].nil? emit_uptime tag: tag, start_time: pod['startTime'], labels: labels if pod['cpu'].nil? if pod['containers'].nil? or Array(pod['containers']).empty? log.warn "Summary API response has no pod cpu metrics information" else usageNanoCores = 0 usageCoreNanoSeconds = 0 time = nil Array(pod['containers']).each do |container| time = container['time'] unless container['time'].nil? usageNanoCores += container['usageNanoCores'] usageCoreNanoSeconds += container['usageCoreNanoSeconds'] end pod['cpu'] = { 'time' => time, 'usageNanoCores' => usageNanoCores, 'usageCoreNanoSeconds' => usageCoreNanoSeconds } end end emit_cpu_metrics tag: tag, metrics: pod['cpu'], labels: labels unless pod['cpu'].nil? if pod['memory'].nil? if pod['containers'].nil? or Array(pod['containers']).empty? log.warn "Summary API response has no pod memory metrics information" else Array(pod['containers']).each do |container| time = nil memory_metrics = {} %w[availableBytes usageBytes workingSetBytes rssBytes pageFaults majorPageFaults].each do |name| time = container['time'] unless container['time'].nil? if value = metrics[name] memory_metrics[name] = 0 if memory_metrics[name].nil? memory_metrics[name] += value end end end memory_metrics['time'] = time pod['memory'] = memory_metrics end end emit_memory_metrics tag: tag, metrics: pod['memory'], labels: labels unless pod['memory'].nil? emit_network_metrics tag: tag, metrics: pod['network'], labels: labels unless pod['network'].nil? emit_fs_metrics tag: "#{tag}.ephemeral-storage", metrics: pod['ephemeral-storage'], labels: labels unless pod['ephemeral-storage'].nil? unless pod['volume'].nil? Array(pod['volume']).each do |volume| emit_fs_metrics tag: "#{tag}.volume", metrics: volume, labels: labels.merge('name' => volume['name']) unless volume.nil? end end unless pod['containers'].nil? Array(pod['containers']).each do |container| emit_container_metrics labels, container unless container.nil? end end end end
emit_stats_breakdown(stats)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 385 def emit_stats_breakdown(stats) stats_latest = stats[-1] tag = 'node' labels = { 'node' => @node_name } unless stats_latest['timestamp'].nil? stats_timestamp = parse_time stats_latest['timestamp'] unless stats_latest['cpu'].nil? emit_cpu_metrics_stats tag: tag, metrics: stats_latest['cpu'], labels: labels, time: stats_timestamp end unless stats_latest['diskio'].nil? emit_diskio_metrics_stats tag: tag, metrics: stats_latest['diskio'], labels: labels, time: stats_timestamp end unless stats_latest['memory'].nil? emit_memory_metrics_stats tag: tag, metrics: stats_latest['memory'], labels: labels, time: stats_timestamp end unless stats_latest['network'].nil? emit_network_metrics_stats tag: tag, metrics: stats_latest['network'], labels: labels, time: stats_timestamp end unless stats_latest['filesystem'].nil? emit_filesystem_metrics_stats tag: tag, metrics: stats_latest['filesystem'], labels: labels, time: stats_timestamp end unless stats_latest['task_stats'].nil? emit_tasks_stats_metrics_stats tag: tag, metrics: stats_latest['task_stats'], labels: labels, time: stats_timestamp end end end
emit_stats_metrics(metrics)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 636 def emit_stats_metrics(metrics) emit_stats_breakdown(metrics['stats']) unless metrics['stats'].nil? end
emit_system_container_metrics(node_name, container)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 375 def emit_system_container_metrics(node_name, container) tag = 'sys-container' labels = { 'node' => node_name, 'name' => container['name'] } unless container['startTime'].nil? emit_uptime tag: tag, start_time: container['startTime'], labels: labels emit_cpu_metrics tag: tag, metrics: container['cpu'], labels: labels unless container['cpu'].nil? emit_memory_metrics tag: tag, metrics: container['memory'], labels: labels unless container['memory'].nil? end end
emit_tasks_stats_metrics_stats(tag:, metrics:, labels:, time:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 520 def emit_tasks_stats_metrics_stats(tag:, metrics:, labels:, time:) metrics.each do |task_stats| task_key, task_value = task_stats unless task_stats.nil? router.emit generate_tag("#{tag}.tasks_stats.".concat(task_key)), time, labels.merge('value' => task_value) end end
emit_uptime(tag:, start_time:, labels:)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 307 def emit_uptime(tag:, start_time:, labels:) unless start_time.nil? uptime = @scraped_at - Time.iso8601(start_time) router.emit generate_tag("#{tag}.uptime"), Fluent::EventTime.from_time(@scraped_at), labels.merge('value' => uptime) end end
generate_tag(item_name)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 114 def generate_tag(item_name) return @tag unless @tag_prefix [@tag_prefix, item_name, @tag_suffix].join end
grep_using_regex(metric, regex)
click to toggle source
Make sure regex has only one capturing group
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 641 def grep_using_regex(metric, regex) match = metric.match(regex) return nil if match.nil? match[1] end
handle_cadvisor_response(response)
click to toggle source
This method is used to handle responses from the cadvisor api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 755 def handle_cadvisor_response(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_at_cadvisor = Time.now emit_cadvisor_metrics response.body else log.error "Expected 2xx from cadvisor metrics API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => e log.error "Failed to scrape metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end
handle_response(response)
click to toggle source
This method is used to handle responses from the kubelet summary api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 727 def handle_response(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_at = Time.now emit_metrics MultiJson.load(response.body) else log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => error log.error "Failed to scrape metrics, error=#{error.inspect}" log.error_backtrace end
handle_stats_response(response)
click to toggle source
This method is used to handle responses from the kubelet stats api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 741 def handle_stats_response(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_at = Time.now emit_stats_metrics MultiJson.load(response.body) else log.error "ExMultiJson.load(response.body) expected 2xx from stats API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => error log.error "Failed to scrape metrics, error=#{error.inspect}" log.error_backtrace end
init_with_kubeconfig(options = {})
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 120 def init_with_kubeconfig(options = {}) config = Kubeclient::Config.read @kubeconfig current_context = config.context @client = Kubeclient::Client.new( current_context.api_endpoint, current_context.api_version, options.merge( ssl_options: current_context.ssl_options, auth_options: current_context.auth_options ) ) end
init_without_kubeconfig(_options = {})
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 134 def init_without_kubeconfig(_options = {}) # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin if @kubernetes_url.nil? # Use Kubernetes default service account if we're in a pod. env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host && env_port @kubernetes_url = "https://#{env_host}:#{env_port}/api/" end end raise Fluent::ConfigError, 'kubernetes url is not set' unless @kubernetes_url # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) secret_ca_file = File.join(@secret_dir, 'ca.crt') secret_token_file = File.join(@secret_dir, 'token') if @ca_file.nil? && File.exist?(secret_ca_file) @ca_file = secret_ca_file end if @bearer_token_file.nil? && File.exist?(secret_token_file) @bearer_token_file = secret_token_file end end ssl_options = { client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)), client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)), ca_file: @ca_file, verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER } auth_options = {} auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file @client = Kubeclient::Client.new( @kubernetes_url, 'v1', ssl_options: ssl_options, auth_options: auth_options ) begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}" end end
initialize_client()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 184 def initialize_client if @use_rest_client initialize_rest_client else options = { timeouts: { open: 10, read: nil } } if @kubeconfig.nil? init_without_kubeconfig options else init_with_kubeconfig options end end end
initialize_rest_client()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 203 def initialize_rest_client env_host = @kubelet_address env_port = @kubelet_port if env_host && env_port if @use_rest_client_ssl @kubelet_url = "https://#{env_host}:#{env_port}/stats/summary" @kubelet_url_stats = "https://#{env_host}:#{env_port}/stats" @cadvisor_url = "https://#{env_host}:#{env_port}/metrics/cadvisor" else @kubelet_url = "http://#{env_host}:#{env_port}/stats/summary" @kubelet_url_stats = "http://#{env_host}:#{env_port}/stats" @cadvisor_url = "http://#{env_host}:#{env_port}/metrics/cadvisor" end end if Dir.exist?(@secret_dir) secret_ca_file = File.join(@secret_dir, 'ca.crt') secret_token_file = File.join(@secret_dir, 'token') if @ca_file.nil? && File.exist?(secret_ca_file) @ca_file = secret_ca_file end if @bearer_token_file.nil? && File.exist?(secret_token_file) @bearer_token_file = secret_token_file end end log.info("Use URL #{@kubelet_url} for creating client to query kubelet summary api") log.info("Use URL #{@kubelet_url_stats} for creating client to query kubelet stats api") log.info("Use URL #{@cadvisor_url} for creating client to query cadvisor metrics api") end
is_stats_endpoint_available?()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 687 def is_stats_endpoint_available? if @use_rest_client response_stats = RestClient::Request.execute request_options_stats else @node_names.each do |node| @node_name = node response_stats = stats_proxy_api(node).get(@client.headers) end end true rescue RestClient::NotFound false end
parse_tag()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 110 def parse_tag @tag_prefix, @tag_suffix = @tag.split('*') if @tag.include?('*') end
parse_time(metric_time)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 299 def parse_time(metric_time) Fluent::EventTime.from_time Time.iso8601(metric_time) end
request_options()
click to toggle source
This method is used to set the options for sending a request to the kubelet api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 248 def request_options options = { method: 'get', url: @kubelet_url } options = options.merge(set_ssl_options) options end
request_options_stats()
click to toggle source
This method is used to set the options for sending a request to the stats api
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 255 def request_options_stats options = { method: 'get', url: @kubelet_url_stats } options = options.merge(set_ssl_options) options end
scrape_cadvisor_metrics()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 714 def scrape_cadvisor_metrics if @use_rest_client response_cadvisor = RestClient::Request.execute cadvisor_request_options handle_cadvisor_response(response_cadvisor) else @node_names.each do |node| response_cadvisor = cadvisor_proxy_api(node).get(@client.headers) handle_cadvisor_response(response_cadvisor) end end end
scrape_metrics()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 675 def scrape_metrics if @use_rest_client response = RestClient::Request.execute request_options handle_response(response) else @node_names.each do |node| response = summary_proxy_api(node).get(@client.headers) handle_response(response) end end end
scrape_stats_metrics()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 701 def scrape_stats_metrics if @use_rest_client response_stats = RestClient::Request.execute request_options_stats handle_stats_response(response_stats) else @node_names.each do |node| @node_name = node response_stats = stats_proxy_api(node).get(@client.headers) handle_stats_response(response_stats) end end end
set_ssl_options()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 234 def set_ssl_options if @use_rest_client_ssl ssl_options = { ssl_ca_file: @ca_file, verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER, headers: { Authorization: 'Bearer ' + File.read(@bearer_token_file) } } else ssl_options = {} end ssl_options end
stats_proxy_api(node)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 279 def stats_proxy_api(node) @stats_api = begin @client.discover unless @client.discovered @client.rest_client["/nodes/#{node}:#{@kubelet_port}/proxy/stats"].tap do |endpoint| log.info("Use URL #{endpoint.url} for scraping stats metrics") end end end
summary_proxy_api(node)
click to toggle source
@client.proxy_url only returns the url, but we need the resource, not just the url
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 269 def summary_proxy_api(node) @summary_api = begin @client.discover unless @client.discovered @client.rest_client["/nodes/#{node}:#{@kubelet_port}/proxy/stats/summary"].tap do |endpoint| log.info("Use URL #{endpoint.url} for scraping metrics") end end end
underscore(camlcase)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metrics.rb, line 303 def underscore(camlcase) camlcase.gsub(/[A-Z]/) { |c| "_#{c.downcase}" } end