Skip to content

Commit

Permalink
CLE controller and client changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefftree committed Jul 24, 2024
1 parent b5a62f1 commit c47ff1e
Show file tree
Hide file tree
Showing 17 changed files with 2,827 additions and 15 deletions.
30 changes: 29 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
"sort"
"time"

"github.com/blang/semver/v4"
"github.com/spf13/cobra"

v1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -78,7 +79,9 @@ import (
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/utils/clock"
)

func init() {
Expand Down Expand Up @@ -289,6 +292,30 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
return startSATokenControllerInit(ctx, controllerContext, controllerName)
}
}
ver, err := semver.ParseTolerant(version.Get().String())
if err != nil {
return err
}

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
// Start component identity lease management
leaseCandidate, err := leaderelection.NewCandidate(
c.Client,
id,
"kube-system",
"kube-controller-manager",
clock.RealClock{},
ver.FinalizeVersion(),
ver.FinalizeVersion(), // TODO: Use compatibility version when it's available
[]v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
)
if err != nil {
return err
}
healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))

go leaseCandidate.Run(ctx)
}

// Start the main lock
go leaderElectAndRun(ctx, c, id, electionChecker,
Expand Down Expand Up @@ -886,6 +913,7 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
Coordinated: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection),
})

panic("unreachable")
Expand Down
37 changes: 36 additions & 1 deletion cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
"os"
goruntime "runtime"

"github.com/blang/semver/v4"
"github.com/spf13/cobra"

coordinationv1 "k8s.io/api/coordination/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator"
Expand Down Expand Up @@ -56,8 +57,11 @@ import (
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
Expand Down Expand Up @@ -207,6 +211,34 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
})
readyzChecks = append(readyzChecks, handlerSyncCheck)

if cc.LeaderElection != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
binaryVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).BinaryVersion().String())
if err != nil {
return err
}
emulationVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).EmulationVersion().String())
if err != nil {
return err
}

// Start component identity lease management
leaseCandidate, err := leaderelection.NewCandidate(
cc.Client,
cc.LeaderElection.Lock.Identity(),
"kube-system",
"kube-scheduler",
clock.RealClock{},
binaryVersion.FinalizeVersion(),
emulationVersion.FinalizeVersion(),
[]coordinationv1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},
)
if err != nil {
return err
}
readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory))
go leaseCandidate.Run(ctx)
}

// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
Expand Down Expand Up @@ -245,6 +277,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
cc.LeaderElection.Coordinated = true
}
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
Expand Down
1 change: 1 addition & 0 deletions hack/local-up-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ EOF
--feature-gates="${FEATURE_GATES}" \
--authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
--authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \
--leader-elect=false \
--master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 &
SCHEDULER_PID=$!
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/controlplane/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiserver

import (
"context"
"fmt"
"os"
"time"
Expand All @@ -41,6 +42,7 @@ import (

"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/leaderelection"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/features"
Expand Down Expand Up @@ -145,6 +147,27 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
return nil, fmt.Errorf("failed to get listener address: %w", err)
}

if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.CoordinatedLeaderElection) {
leaseInformer := s.VersionedInformers.Coordination().V1().Leases()
lcInformer := s.VersionedInformers.Coordination().V1alpha1().LeaseCandidates()
// Ensure that informers are registered before starting. Coordinated Leader Election leader-elected
// and may register informer handlers after they are started.
_ = leaseInformer.Informer()
_ = lcInformer.Informer()
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-coordinated-leader-election-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go leaderelection.RunWithLeaderElection(hookContext, s.GenericAPIServer.LoopbackClientConfig, func() (func(ctx context.Context, workers int), error) {
controller, err := leaderelection.NewController(
leaseInformer,
lcInformer,
client.CoordinationV1(),
client.CoordinationV1alpha1(),
)
return controller.Run, err
})
return nil
})
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
Expand Down
135 changes: 135 additions & 0 deletions pkg/controlplane/controller/leaderelection/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package leaderelection

import (
"slices"
"time"

"github.com/blang/semver/v4"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
"k8s.io/klog/v2"
)

func pickBestLeaderOldestEmulationVersion(candidates []*v1alpha1.LeaseCandidate) *v1alpha1.LeaseCandidate {
var electee *v1alpha1.LeaseCandidate
for _, c := range candidates {
if !validLeaseCandidateForOldestEmulationVersion(c) {
continue
}
if electee == nil || compare(electee, c) > 0 {
electee = c
}
}
if electee == nil {
klog.Infof("pickBestLeader: none found")
} else {
klog.Infof("pickBestLeader: %s %s", electee.Namespace, electee.Name)
}
return electee
}

func shouldReelect(candidates []*v1alpha1.LeaseCandidate, currentLeader *v1alpha1.LeaseCandidate) bool {
klog.Infof("shouldReelect for candidates: %+v", candidates)
pickedLeader := pickBestLeaderOldestEmulationVersion(candidates)
if pickedLeader == nil {
return false
}
return compare(currentLeader, pickedLeader) > 0
}

func pickBestStrategy(candidates []*v1alpha1.LeaseCandidate) v1.CoordinatedLeaseStrategy {
// TODO: This doesn't account for cycles within the preference graph
// We may have to do a topological sort to verify that the preference ordering is valid
var bestStrategy *v1.CoordinatedLeaseStrategy
for _, c := range candidates {
if len(c.Spec.PreferredStrategies) > 0 {
if bestStrategy == nil {
bestStrategy = &c.Spec.PreferredStrategies[0]
continue
}
if *bestStrategy != c.Spec.PreferredStrategies[0] {
if idx := slices.Index(c.Spec.PreferredStrategies, *bestStrategy); idx > 0 {
bestStrategy = &c.Spec.PreferredStrategies[0]
} else {
klog.Infof("Error: bad strategy ordering")
}
}
}
}
return (*bestStrategy)
}

func validLeaseCandidateForOldestEmulationVersion(l *v1alpha1.LeaseCandidate) bool {
_, err := semver.ParseTolerant(l.Spec.EmulationVersion)
if err != nil {
return false
}
_, err = semver.ParseTolerant(l.Spec.BinaryVersion)
return err == nil
}

func getEmulationVersion(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.EmulationVersion
v, err := semver.ParseTolerant(value)
if err != nil {
return semver.Version{}
}
return v
}

func getBinaryVersion(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.BinaryVersion
v, err := semver.ParseTolerant(value)
if err != nil {
return semver.Version{}
}
return v
}

// -1: lhs better, 1: rhs better
func compare(lhs, rhs *v1alpha1.LeaseCandidate) int {
lhsVersion := getEmulationVersion(lhs)
rhsVersion := getEmulationVersion(rhs)
result := lhsVersion.Compare(rhsVersion)
if result == 0 {
lhsVersion := getBinaryVersion(lhs)
rhsVersion := getBinaryVersion(rhs)
result = lhsVersion.Compare(rhsVersion)
}
if result == 0 {
if lhs.CreationTimestamp.After(rhs.CreationTimestamp.Time) {
return 1
}
return -1
}
return result
}

func isLeaseExpired(lease *v1.Lease) bool {
currentTime := time.Now()
return lease.Spec.RenewTime == nil ||
lease.Spec.LeaseDurationSeconds == nil ||
lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime)
}

func isLeaseCandidateExpired(lease *v1alpha1.LeaseCandidate) bool {
currentTime := time.Now()
return lease.Spec.RenewTime == nil ||
lease.Spec.RenewTime.Add(leaseCandidateValidDuration).Before(currentTime)
}
Loading

0 comments on commit c47ff1e

Please sign in to comment.