class Fluent::Plugin::KubernetesObjectsInput

Constants

VERSION

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 104
def close
  super
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 87
def configure(conf)
  super

  raise Fluent::ConfigError, 'At least one <pull> or <watch> is required, but found none.' if @pull_objects.empty? && @watch_objects.empty?

  @storage = storage_create usage: 'checkpoints'

  parse_tag
  initialize_client
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 98
def start
  super
  start_pullers
  start_watchers
end

Private Instance Methods

create_pull_thread(conf) click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 193
def create_pull_thread(conf)
  options = conf.to_h.dup
  options[:as] = :raw
  resource_name = options.delete :resource_name
  pull_interval = options.delete :interval

  thread_create :"pull_#{resource_name}" do
    tag = generate_tag resource_name
    while thread_current_running?
      log.debug "Going to pull #{resource_name}"
      response = @client.public_send "get_#{resource_name}", options
      now = Fluent::Engine.now
      es = Fluent::MultiEventStream.new

      # code copied from kubeclient
      # kubeclient will create one open struct object for each item in the response,
      # but this is totally unecessary in this plugin, thus we use as: :raw.
      result = JSON.parse(response)

      resource_version = result.fetch('resourceVersion') do
        result.fetch('metadata', {})['resourceVersion']
      end

      update_op = if resource_version
                    ->(item) { item['metadata'].update requestResourceVersion: resource_version }
                  else
                    ->(item) {}
                  end

      # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
      items = result['items'].to_a
      log.debug { "Received #{items.size} #{resource_name}" }
      items.each { |item| es.add now, item.tap(&update_op) }
      router.emit_stream(tag, es)

      sleep(pull_interval)
    end
  end
end
create_watcher_thread(conf) click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 233
def create_watcher_thread(conf)
  options = conf.to_h.dup
  options[:as] = :raw
  resource_name = options[:resource_name]
  version = @storage.get(resource_name)
  if version
    options[:resource_version] = version
  else
    options[:resource_version] = 0
  end

  thread_create :"watch_#{resource_name}" do
    while thread_current_running?
      @client.public_send("watch_#{resource_name}", options).tap do |watcher|
        tag = generate_tag "#{resource_name}"
        begin
          watcher.each do |entity|
            begin
              entity = JSON.parse(entity)
              router.emit tag, Fluent::Engine.now, entity
              options[:resource_version] = entity['object']['metadata']['resourceVersion']
              @storage.put resource_name, entity['object']['metadata']['resourceVersion']
            rescue => e
              log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
            end
          end
        rescue => e
          log.info "Got exception #{e}. Resetting watcher."
        end
      end
    end
  end
end
generate_tag(item_name) click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 114
def generate_tag(item_name)
  return @tag unless @tag_prefix

  [@tag_prefix, item_name, @tag_suffix].join
end
init_with_kubeconfig() click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 120
def init_with_kubeconfig()
  options = {}
  config = Kubeclient::Config.read @kubeconfig
  current_context = config.context

  @client = Kubeclient::Client.new(
    current_context.api_endpoint,
    current_context.api_version,
    options.merge(
      ssl_options: current_context.ssl_options,
      auth_options: current_context.auth_options
    )
  )
end
initialize_client() click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 135
def initialize_client
  # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin
  if @kubernetes_url.nil?
    # Use Kubernetes default service account if we're in a pod.
    env_host = ENV['KUBERNETES_SERVICE_HOST']
    env_port = ENV['KUBERNETES_SERVICE_PORT']
    if env_host && env_port
      @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}"
    end
  end

  raise Fluent::ConfigError, 'kubernetes url is not set' unless @kubernetes_url

  # Use SSL certificate and bearer token from Kubernetes service account.
  if Dir.exist?(@secret_dir)
    secret_ca_file = File.join(@secret_dir, 'ca.crt')
    secret_token_file = File.join(@secret_dir, 'token')

    if @ca_file.nil? && File.exist?(secret_ca_file)
      @ca_file = secret_ca_file
    end

    if @bearer_token_file.nil? && File.exist?(secret_token_file)
      @bearer_token_file = secret_token_file
    end
  end

  ssl_options = {
    client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)),
    client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)),
    ca_file: @ca_file,
    verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
  }

  auth_options = {}
  auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file

  @client = Kubeclient::Client.new(
    @kubernetes_url, @api_version,
    ssl_options: ssl_options,
    auth_options: auth_options
  )

  begin
      @client.api_valid?
  rescue KubeException => kube_error
    raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}"
    end
end
parse_tag() click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 110
def parse_tag
  @tag_prefix, @tag_suffix = @tag.split('*') if @tag.include?('*')
end
start_pullers() click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 185
def start_pullers
  @pull_objects.each(&method(:create_pull_thread))
end
start_watchers() click to toggle source
# File lib/fluent/plugin/in_kubernetes_objects.rb, line 189
def start_watchers
  @watch_objects.each(&method(:create_watcher_thread))
end