class K8sInternalLb::Client

Constants

TIMESTAMP_ANNOTATION

Attributes

api_version[RW]
auth_options[RW]
kubeclient_options[RW]
namespace[RW]
server[RW]
services[R]
sleep_duration[RW]
ssl_options[RW]

Public Class Methods

instance() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 13
def self.instance
  @instance ||= Client.new
end
new() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 75
def initialize
  @sleep_duration = 5

  @kubeclient_options = {}
  @auth_options = {}
  @ssl_options = {}

  @namespace = nil
  @server = nil
  @api_version = 'v1'

  @services = {}

  return unless in_cluster?

  @server = 'https://kubernetes.default.svc'
  @namespace ||= File.read('/var/run/secrets/kubernetes.io/serviceaccount/namespace')
  if @auth_options.empty?
    @auth_options = {
      bearer_token_file: '/var/run/secrets/kubernetes.io/serviceaccount/token'
    }
  end

  return unless File.exist?('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt')

  @ssl_options[:ca_file] = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
end

Public Instance Methods

add_service(name, **data) click to toggle source
# File lib/k8s_internal_lb/client.rb, line 22
def add_service(name, **data)
  service = nil

  if name.is_a? Service
    service = name
    name = service.name
  else
    data[:name] ||= name
    service = Service.create(**data)
  end

  k8s_service = get_endpoint(service)
  raise 'Unable to find service' if k8s_service.nil?

  if k8s_service.metadata&.annotations&.to_hash&.key? TIMESTAMP_ANNOTATION
    ts = k8s_service.annotations[TIMESTAMP_ANNOTATION]
    if ts =~ /\A\d+\z/
      service.last_update = Time.at(ts.to_i)
    else
      service.last_update = Time.parse(ts)
    end
  end

  @services[name] = service
end
in_cluster?() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 17
def in_cluster?
  # FIXME: Better detection, actually look for the necessary cluster components
  Dir.exist? '/var/run/secrets/kubernetes.io'
end
remove_service(name) click to toggle source
# File lib/k8s_internal_lb/client.rb, line 48
def remove_service(name)
  @services.delete name
end
run() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 52
def run
  loop do
    sleep_duration = @sleep_duration

    @services.each do |name, service|
      logger.debug "Checking #{name} for interval"

      diff = (Time.now - service.last_update)
      until_next = service.interval - diff
      sleep_duration = until_next if until_next.positive? && until_next < sleep_duration

      next unless diff >= service.interval

      logger.debug "Interval reached on #{name}, running update"
      update(service)
    end

    sleep sleep_duration
  end
end

Private Instance Methods

get_endpoint(service) click to toggle source
# File lib/k8s_internal_lb/client.rb, line 141
def get_endpoint(service)
  kubeclient.get_endpoint(service.name, service.namespace || namespace)
rescue Kubeclient::ResourceNotFoundError
  nil
end
get_service(service) click to toggle source
# File lib/k8s_internal_lb/client.rb, line 135
def get_service(service)
  kubeclient.get_service(service.name, service.namespace || namespace)
rescue Kubeclient::ResourceNotFoundError
  nil
end
kubeclient() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 147
def kubeclient
  @kubeclient ||= Kubeclient::Client.new(
    server,
    api_version,
    auth_options: auth_options,
    ssl_options: ssl_options,
    **kubeclient_options
  )
end
logger() click to toggle source
# File lib/k8s_internal_lb/client.rb, line 103
def logger
  @logger ||= Logging::Logger[self]
end
update(service, force: false) click to toggle source
# File lib/k8s_internal_lb/client.rb, line 107
def update(service, force: false)
  service = @services[service] unless service.is_a? Service

  old_endpoints = service.endpoints.dup
  service.last_update = Time.now
  service.update
  endpoints = service.endpoints

  return true if old_endpoints == endpoints && !force

  logger.info "Active endpoints have changed for #{service.name}, updating cluster data to #{service.to_subsets.to_json}"

  kubeclient.patch_endpoint(
    service.name,
    {
      metadata: {
        annotations: {
          TIMESTAMP_ANNOTATION => Time.now.to_s
        }
      },
      subsets: service.to_subsets
    },
    service.namespace || namespace
  )
rescue StandardError => e
  raise e
end