class Fluent::Plugin::KubernetesObjectsInput
Constants
- VERSION
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 104 def close super end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 87 def configure(conf) super raise Fluent::ConfigError, 'At least one <pull> or <watch> is required, but found none.' if @pull_objects.empty? && @watch_objects.empty? @storage = storage_create usage: 'checkpoints' parse_tag initialize_client end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 98 def start super start_pullers start_watchers end
Private Instance Methods
create_pull_thread(conf)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 193 def create_pull_thread(conf) options = conf.to_h.dup options[:as] = :raw resource_name = options.delete :resource_name pull_interval = options.delete :interval thread_create :"pull_#{resource_name}" do tag = generate_tag resource_name while thread_current_running? log.debug "Going to pull #{resource_name}" response = @client.public_send "get_#{resource_name}", options now = Fluent::Engine.now es = Fluent::MultiEventStream.new # code copied from kubeclient # kubeclient will create one open struct object for each item in the response, # but this is totally unecessary in this plugin, thus we use as: :raw. result = JSON.parse(response) resource_version = result.fetch('resourceVersion') do result.fetch('metadata', {})['resourceVersion'] end update_op = if resource_version ->(item) { item['metadata'].update requestResourceVersion: resource_version } else ->(item) {} end # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096 items = result['items'].to_a log.debug { "Received #{items.size} #{resource_name}" } items.each { |item| es.add now, item.tap(&update_op) } router.emit_stream(tag, es) sleep(pull_interval) end end end
create_watcher_thread(conf)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 233 def create_watcher_thread(conf) options = conf.to_h.dup options[:as] = :raw resource_name = options[:resource_name] version = @storage.get(resource_name) if version options[:resource_version] = version else options[:resource_version] = 0 end thread_create :"watch_#{resource_name}" do while thread_current_running? @client.public_send("watch_#{resource_name}", options).tap do |watcher| tag = generate_tag "#{resource_name}" begin watcher.each do |entity| begin entity = JSON.parse(entity) router.emit tag, Fluent::Engine.now, entity options[:resource_version] = entity['object']['metadata']['resourceVersion'] @storage.put resource_name, entity['object']['metadata']['resourceVersion'] rescue => e log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher." end end rescue => e log.info "Got exception #{e}. Resetting watcher." end end end end end
generate_tag(item_name)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 114 def generate_tag(item_name) return @tag unless @tag_prefix [@tag_prefix, item_name, @tag_suffix].join end
init_with_kubeconfig()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 120 def init_with_kubeconfig() options = {} config = Kubeclient::Config.read @kubeconfig current_context = config.context @client = Kubeclient::Client.new( current_context.api_endpoint, current_context.api_version, options.merge( ssl_options: current_context.ssl_options, auth_options: current_context.auth_options ) ) end
initialize_client()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 135 def initialize_client # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin if @kubernetes_url.nil? # Use Kubernetes default service account if we're in a pod. env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host && env_port @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}" end end raise Fluent::ConfigError, 'kubernetes url is not set' unless @kubernetes_url # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) secret_ca_file = File.join(@secret_dir, 'ca.crt') secret_token_file = File.join(@secret_dir, 'token') if @ca_file.nil? && File.exist?(secret_ca_file) @ca_file = secret_ca_file end if @bearer_token_file.nil? && File.exist?(secret_token_file) @bearer_token_file = secret_token_file end end ssl_options = { client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)), client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)), ca_file: @ca_file, verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER } auth_options = {} auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file @client = Kubeclient::Client.new( @kubernetes_url, @api_version, ssl_options: ssl_options, auth_options: auth_options ) begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}" end end
parse_tag()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 110 def parse_tag @tag_prefix, @tag_suffix = @tag.split('*') if @tag.include?('*') end
start_pullers()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 185 def start_pullers @pull_objects.each(&method(:create_pull_thread)) end
start_watchers()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 189 def start_watchers @watch_objects.each(&method(:create_watcher_thread)) end