class KubernetesLeaderElection

Constants

ALREADY_EXISTS_CODE
FAILED_KUBERNETES_REQUEST
VERSION

Public Class Methods

new(name, kubeclient, logger:, statsd: nil, interval: 30) click to toggle source
# File lib/kubernetes_leader_election.rb, line 12
def initialize(name, kubeclient, logger:, statsd: nil, interval: 30)
  @name = name
  @kubeclient = kubeclient
  @statsd = statsd
  @logger = logger
  @interval = interval
end

Public Instance Methods

become_leader_for_life() { || ... } click to toggle source

not using `call` since we never want to be restarted

# File lib/kubernetes_leader_election.rb, line 21
def become_leader_for_life
  @logger.info message: "trying to become leader ... if both pods show this, delete the #{@name} lease"
  loop do
    break if become_leader
    sleep @interval
  end
  yield # signal we are leader, but keep reporting
  loop do
    @statsd&.increment('leader_running') # we monitor this to make sure it's always exactly 1
    sleep @interval
    signal_alive
  end
end

Private Instance Methods

alive?(lease) click to toggle source
# File lib/kubernetes_leader_election.rb, line 55
def alive?(lease)
  Time.parse(lease.dig(:spec, :renewTime)) > Time.now - (2 * @interval)
end
become_leader() click to toggle source

everyone tries to create the same leases, who succeeds is the owner, leases is auto-deleted by GC when owner is deleted same logic lives in kube-service-watcher & kube-stats

# File lib/kubernetes_leader_election.rb, line 62
def become_leader
  namespace = ENV.fetch("POD_NAMESPACE")
  # retry request on regular api errors
  reraise = ->(e) { e.is_a?(Kubeclient::HttpError) && e.error_code == ALREADY_EXISTS_CODE }

  with_retries(*FAILED_KUBERNETES_REQUEST, reraise: reraise, times: 3) do
    @kubeclient.create_entity(
      "Lease",
      "leases",
      metadata: {
        name: @name,
        namespace: namespace,
        ownerReferences: [{
          apiVersion: "v1",
          kind: "Pod",
          name: ENV.fetch("POD_NAME"),
          uid: ENV.fetch("POD_UID")
        }]
      },
      spec: {
        acquireTime: microtime,
        holderIdentity: ENV.fetch("POD_NAME"), # shown in `kubectl get lease`
        leaseDurationSeconds: @interval * 2,
        leaseTransitions: 0, # will never change since we delete the lease
        renewTime: microtime
      }
    )
  end
  @logger.info message: "became leader"
  true # I'm the leader now
rescue Kubeclient::HttpError => e
  raise e unless e.error_code == ALREADY_EXISTS_CODE # lease already exists

  lease = with_retries(*FAILED_KUBERNETES_REQUEST, times: 3) do
    @kubeclient.get_entity("leases", @name, namespace)
  rescue Kubeclient::ResourceNotFoundError
    nil
  end

  if !lease
    @logger.info message: "stale lease was deleted"
    false
  elsif lease.dig(:metadata, :ownerReferences, 0, :name) == ENV.fetch("POD_NAME")
    @logger.info message: "still leader"
    true # I restarted and am still the leader
  elsif !alive?(lease)
    # this is still a race-condition since we could be deleting the newly succeeded leader
    # see https://github.com/kubernetes/kubernetes/issues/20572
    @logger.info message: "deleting stale lease"
    with_retries(*FAILED_KUBERNETES_REQUEST, times: 3) do
      @kubeclient.delete_entity("leases", @name, namespace)
    end
    false # leader is dead, do not assume leadership here to avoid race condition
  else
    false # leader is still alive ... not logging to avoid repetitive noise
  end
end
microtime() click to toggle source

kubernetes needs exactly this format or it blows up

# File lib/kubernetes_leader_election.rb, line 51
def microtime
  Time.now.strftime('%FT%T.000000Z')
end
signal_alive() click to toggle source

show that we are alive or crash because we cannot reach the api (split-brain az)

# File lib/kubernetes_leader_election.rb, line 38
def signal_alive
  with_retries(*FAILED_KUBERNETES_REQUEST, times: 3) do
    patch = { spec: { renewTime: microtime } }
    reply = @kubeclient.patch_entity(
      "leases", @name, patch, 'strategic-merge-patch', ENV.fetch("POD_NAMESPACE")
    )

    current_leader = reply.dig(:metadata, :ownerReferences, 0, :name)
    raise "Lost leadership to #{current_leader}" if current_leader != ENV.fetch("POD_NAME")
  end
end
with_retries(*errors, times:, reraise: nil, backoff: [0.1, 0.5, 1]) { || ... } click to toggle source
# File lib/kubernetes_leader_election.rb, line 120
def with_retries(*errors, times:, reraise: nil, backoff: [0.1, 0.5, 1])
  yield
rescue *errors => e
  retries ||= -1
  retries += 1
  raise if retries >= times || reraise&.call(e)
  @logger.warn message: "Retryable error", type: e.class.to_s, retries: times - retries
  sleep backoff[retries] || backoff.last
  retry
end