module KubernetesMetadata::WatchNamespaces
Public Instance Methods
get_namespaces_and_start_watcher()
click to toggle source
List all namespaces, record the resourceVersion and return a watcher starting from that resourceVersion.
# File lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb, line 91 def get_namespaces_and_start_watcher options = { resource_version: '0' # Fetch from API server cache instead of etcd quorum read } namespaces = @client.get_namespaces(options) namespaces[:items].each do |namespace| cache_key = namespace[:metadata][:uid] @namespace_cache[cache_key] = parse_namespace_metadata(namespace) @stats.bump(:namespace_cache_host_updates) end # continue watching from most recent resourceVersion options[:resource_version] = namespaces[:metadata][:resourceVersion] watcher = @client.watch_namespaces(options) reset_namespace_watch_retry_stats watcher end
process_namespace_watcher_notices(watcher)
click to toggle source
Process a watcher notice and potentially raise an exception.
# File lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb, line 118 def process_namespace_watcher_notices(watcher) watcher.each do |notice| case notice[:type] when 'MODIFIED' reset_namespace_watch_retry_stats cache_key = notice[:object][:metadata][:uid] cached = @namespace_cache[cache_key] if cached @namespace_cache[cache_key] = parse_namespace_metadata(notice[:object]) @stats.bump(:namespace_cache_watch_updates) else @stats.bump(:namespace_cache_watch_misses) end when 'DELETED' reset_namespace_watch_retry_stats # ignore and let age out for cases where # deleted but still processing logs @stats.bump(:namespace_cache_watch_deletes_ignored) when 'ERROR' if notice[:object] && notice[:object][:code] == 410 @stats.bump(:namespace_watch_gone_notices) raise GoneError else @stats.bump(:namespace_watch_error_type_notices) message = notice[:object][:message] if notice[:object] && notice[:object][:message] raise "Error while watching namespaces: #{message}" end else reset_namespace_watch_retry_stats # Don't pay attention to creations, since the created namespace may not # be used by any namespace on this node. @stats.bump(:namespace_cache_watch_ignored) end end end
reset_namespace_watch_retry_stats()
click to toggle source
Reset namespace watch retry count and backoff interval as there is a successful watch notice.
# File lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb, line 112 def reset_namespace_watch_retry_stats Thread.current[:namespace_watch_retry_count] = 0 Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval end
set_up_namespace_thread()
click to toggle source
# File lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb, line 28 def set_up_namespace_thread # Any failures / exceptions in the initial setup should raise # Fluent:ConfigError, so that users can inspect potential errors in # the configuration. namespace_watcher = start_namespace_watch Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval Thread.current[:namespace_watch_retry_count] = 0 # Any failures / exceptions in the followup watcher notice # processing will be swallowed and retried. These failures / # exceptions could be caused by Kubernetes API being temporarily # down. We assume the configuration is correct at this point. loop do namespace_watcher ||= get_namespaces_and_start_watcher process_namespace_watcher_notices(namespace_watcher) rescue GoneError => e # Expected error. Quietly go back through the loop in order to # start watching from the latest resource versions @stats.bump(:namespace_watch_gone_errors) log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e) namespace_watcher = nil rescue StandardError => e @stats.bump(:namespace_watch_failures) if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times # Instead of raising exceptions and crashing Fluentd, swallow # the exception and reset the watcher. log.info( 'Exception encountered parsing namespace watch event. ' \ 'The connection might have been closed. Sleeping for ' \ "#{Thread.current[:namespace_watch_retry_backoff_interval]} " \ 'seconds and resetting the namespace watcher.', e ) sleep(Thread.current[:namespace_watch_retry_backoff_interval]) Thread.current[:namespace_watch_retry_count] += 1 Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base namespace_watcher = nil else # Since retries failed for many times, log as errors instead # of info and raise exceptions and trigger Fluentd to restart. message = 'Exception encountered parsing namespace watch event. The ' \ 'connection might have been closed. Retried ' \ "#{@watch_retry_max_times} times yet still failing. Restarting." log.error(message, e) raise Fluent::UnrecoverableError, message end end end
start_namespace_watch()
click to toggle source
# File lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb, line 77 def start_namespace_watch get_namespaces_and_start_watcher rescue StandardError => e message = 'start_namespace_watch: Exception encountered setting up ' \ "namespace watch from Kubernetes API #{@apiVersion} endpoint " \ "#{@kubernetes_url}: #{e.message}" message += " (#{e.response})" if e.respond_to?(:response) log.debug(message) raise Fluent::ConfigError, message end