Skip to content

Commit

Permalink
Merge pull request kubernetes#124654 from sttts/sttts-controlplane-ag…
Browse files Browse the repository at this point in the history
…gregator

Step 9 – aggregator: move construction to generic controlplane
  • Loading branch information
k8s-ci-robot authored May 1, 2024
2 parents 83cdd30 + 89bafb2 commit d387c0c
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 271 deletions.
263 changes: 0 additions & 263 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,242 +20,10 @@ limitations under the License.
package app

import (
"fmt"
"net/http"
"strings"
"sync"

"k8s.io/klog/v2"

apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
controlplaneapiserveroptions "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/controlplane/controller/crdregistration"
)

func createAggregatorConfig(
kubeAPIServerConfig genericapiserver.Config,
commandOptions controlplaneapiserveroptions.CompletedOptions,
externalInformers kubeexternalinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
genericConfig := kubeAPIServerConfig
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
genericConfig.RESTOptionsGetter = nil
// prevent generic API server from installing the OpenAPI handler. Aggregator server
// has its own customized OpenAPI handler.
genericConfig.SkipOpenAPIInstallation = true

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Add StorageVersionPrecondition handler to aggregator-apiserver.
// The handler will block write requests to built-in resources until the
// target resources' storage versions are up-to-date.
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
}

if peerProxy != nil {
originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
// Add peer proxy handler to aggregator-apiserver.
// wrap the peer proxy handler first.
apiHandler = peerProxy.WrapHandler(apiHandler)
return originalHandlerChainBuilder(apiHandler, c)
}
}

// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
return nil, err
}

// override MergedResourceConfig with aggregator defaults and registry
if err := commandOptions.APIEnablement.ApplyTo(
&genericConfig,
aggregatorapiserver.DefaultAPIResourceConfigSource(),
aggregatorscheme.Scheme); err != nil {
return nil, err
}

aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
},
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
},
}

// we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}

return aggregatorConfig, nil
}

func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

// create controllers for auto-registration
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)

type controller interface {
Run(workers int, stopCh <-chan struct{})
WaitForInitialSync()
}
var crdRegistrationController controller
if crdAPIEnabled {
crdRegistrationController = crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)
}

// Imbue all builtin group-priorities onto the aggregated discovery
if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range apiVersionPriorities {
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version))
}
}

err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
if crdAPIEnabled {
go crdRegistrationController.Run(5, context.StopCh)
}
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if crdAPIEnabled {
klog.Infof("waiting for initial CRD sync...")
crdRegistrationController.WaitForInitialSync()
klog.Infof("initial CRD sync complete...")
} else {
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
if err != nil {
return nil, err
}

err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
makeAPIServiceAvailableHealthCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
),
)
if err != nil {
return nil, err
}

return aggregatorServer, nil
}

func makeAPIService(gv schema.GroupVersion) *v1.APIService {
apiServicePriority, ok := apiVersionPriorities[gv]
if !ok {
// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
// being permanently stuck in the APIServices list.
klog.Infof("Skipping APIService creation for %v", gv)
return nil
}
return &v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: v1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
GroupPriorityMinimum: apiServicePriority.Group,
VersionPriority: apiServicePriority.Version,
},
}
}

// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
// once all of the specified services have been observed to be available at least once.
func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker {
// Track the auto-registered API services that have not been observed to be available yet
pendingServiceNamesLock := &sync.RWMutex{}
pendingServiceNames := sets.NewString()
for _, service := range apiServices {
pendingServiceNames.Insert(service.Name)
}

// When an APIService in the list is seen as available, remove it from the pending list
handleAPIServiceChange := func(service *v1.APIService) {
pendingServiceNamesLock.Lock()
defer pendingServiceNamesLock.Unlock()
if !pendingServiceNames.Has(service.Name) {
return
}
if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
pendingServiceNames.Delete(service.Name)
}
}

// Watch add/update events for APIServices
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
})

// Don't return healthy until the pending list is empty
return healthz.NamedCheck(name, func(r *http.Request) error {
pendingServiceNamesLock.RLock()
defer pendingServiceNamesLock.RUnlock()
if pendingServiceNames.Len() > 0 {
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
}
return nil
})
}

// The proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
// is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
// This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
Expand Down Expand Up @@ -287,37 +55,6 @@ var apiVersionPriorities = merge(controlplaneapiserver.DefaultGenericAPIServiceP
// Version can be set to 9 (to have space around) for a new group.
})

func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
apiServices := []*v1.APIService{}

for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
continue
}

if !strings.HasPrefix(curr, "/apis/") {
continue
}
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
tokens := strings.Split(curr, "/")
if len(tokens) != 4 {
continue
}

apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
if apiService == nil {
continue
}
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
}

return apiServices
}

func merge(a, b map[schema.GroupVersion]controlplaneapiserver.APIServicePriority) map[schema.GroupVersion]controlplaneapiserver.APIServicePriority {
for k, v := range b {
a[k] = v
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
aggregator, err := controlplaneapiserver.CreateAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregat
}

// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
aggregatorServer, err := controlplaneapiserver.CreateAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdAPIEnabled, apiVersionPriorities)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
Expand Down
Loading

0 comments on commit d387c0c

Please sign in to comment.