diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 97a4050d366d8..6da672bb29f03 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -20,242 +20,10 @@ limitations under the License. package app import ( - "fmt" - "net/http" - "strings" - "sync" - - "k8s.io/klog/v2" - - apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/admission" - genericfeatures "k8s.io/apiserver/pkg/features" - genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/apiserver/pkg/server/healthz" - utilfeature "k8s.io/apiserver/pkg/util/feature" - utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" - kubeexternalinformers "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" - v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" - aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" - aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" - apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" - informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" - "k8s.io/kube-aggregator/pkg/controllers/autoregister" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" - controlplaneapiserveroptions "k8s.io/kubernetes/pkg/controlplane/apiserver/options" - "k8s.io/kubernetes/pkg/controlplane/controller/crdregistration" ) -func createAggregatorConfig( - kubeAPIServerConfig genericapiserver.Config, - commandOptions controlplaneapiserveroptions.CompletedOptions, - externalInformers kubeexternalinformers.SharedInformerFactory, - serviceResolver aggregatorapiserver.ServiceResolver, - proxyTransport *http.Transport, - peerProxy utilpeerproxy.Interface, - pluginInitializers []admission.PluginInitializer, -) (*aggregatorapiserver.Config, error) { - // make a shallow copy to let us twiddle a few things - // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator - genericConfig := kubeAPIServerConfig - genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} - genericConfig.RESTOptionsGetter = nil - // prevent generic API server from installing the OpenAPI handler. Aggregator server - // has its own customized OpenAPI handler. - genericConfig.SkipOpenAPIInstallation = true - - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && - utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { - // Add StorageVersionPrecondition handler to aggregator-apiserver. - // The handler will block write requests to built-in resources until the - // target resources' storage versions are up-to-date. - genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition - } - - if peerProxy != nil { - originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc - genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler { - // Add peer proxy handler to aggregator-apiserver. - // wrap the peer proxy handler first. - apiHandler = peerProxy.WrapHandler(apiHandler) - return originalHandlerChainBuilder(apiHandler, c) - } - } - - // copy the etcd options so we don't mutate originals. - // we assume that the etcd options have been completed already. avoid messing with anything outside - // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. - etcdOptions := *commandOptions.Etcd - etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion) - etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) - etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks - if err := etcdOptions.ApplyTo(&genericConfig); err != nil { - return nil, err - } - - // override MergedResourceConfig with aggregator defaults and registry - if err := commandOptions.APIEnablement.ApplyTo( - &genericConfig, - aggregatorapiserver.DefaultAPIResourceConfigSource(), - aggregatorscheme.Scheme); err != nil { - return nil, err - } - - aggregatorConfig := &aggregatorapiserver.Config{ - GenericConfig: &genericapiserver.RecommendedConfig{ - Config: genericConfig, - SharedInformerFactory: externalInformers, - }, - ExtraConfig: aggregatorapiserver.ExtraConfig{ - ProxyClientCertFile: commandOptions.ProxyClientCertFile, - ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, - PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress, - ServiceResolver: serviceResolver, - ProxyTransport: proxyTransport, - RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects, - }, - } - - // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails) - aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} - - return aggregatorConfig, nil -} - -func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) { - aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer) - if err != nil { - return nil, err - } - - // create controllers for auto-registration - apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig) - if err != nil { - return nil, err - } - autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient) - apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController) - - type controller interface { - Run(workers int, stopCh <-chan struct{}) - WaitForInitialSync() - } - var crdRegistrationController controller - if crdAPIEnabled { - crdRegistrationController = crdregistration.NewCRDRegistrationController( - apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(), - autoRegistrationController) - } - - // Imbue all builtin group-priorities onto the aggregated discovery - if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { - for gv, entry := range apiVersionPriorities { - aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version)) - } - } - - err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { - if crdAPIEnabled { - go crdRegistrationController.Run(5, context.StopCh) - } - go func() { - // let the CRD controller process the initial set of CRDs before starting the autoregistration controller. - // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist. - // we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery. - if crdAPIEnabled { - klog.Infof("waiting for initial CRD sync...") - crdRegistrationController.WaitForInitialSync() - klog.Infof("initial CRD sync complete...") - } else { - klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync") - } - autoRegistrationController.Run(5, context.StopCh) - }() - return nil - }) - if err != nil { - return nil, err - } - - err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks( - makeAPIServiceAvailableHealthCheck( - "autoregister-completion", - apiServices, - aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), - ), - ) - if err != nil { - return nil, err - } - - return aggregatorServer, nil -} - -func makeAPIService(gv schema.GroupVersion) *v1.APIService { - apiServicePriority, ok := apiVersionPriorities[gv] - if !ok { - // if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version - // being permanently stuck in the APIServices list. - klog.Infof("Skipping APIService creation for %v", gv) - return nil - } - return &v1.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group}, - Spec: v1.APIServiceSpec{ - Group: gv.Group, - Version: gv.Version, - GroupPriorityMinimum: apiServicePriority.Group, - VersionPriority: apiServicePriority.Version, - }, - } -} - -// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy -// once all of the specified services have been observed to be available at least once. -func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker { - // Track the auto-registered API services that have not been observed to be available yet - pendingServiceNamesLock := &sync.RWMutex{} - pendingServiceNames := sets.NewString() - for _, service := range apiServices { - pendingServiceNames.Insert(service.Name) - } - - // When an APIService in the list is seen as available, remove it from the pending list - handleAPIServiceChange := func(service *v1.APIService) { - pendingServiceNamesLock.Lock() - defer pendingServiceNamesLock.Unlock() - if !pendingServiceNames.Has(service.Name) { - return - } - if v1helper.IsAPIServiceConditionTrue(service, v1.Available) { - pendingServiceNames.Delete(service.Name) - } - } - - // Watch add/update events for APIServices - apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) }, - UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) }, - }) - - // Don't return healthy until the pending list is empty - return healthz.NamedCheck(name, func(r *http.Request) error { - pendingServiceNamesLock.RLock() - defer pendingServiceNamesLock.RUnlock() - if pendingServiceNames.Len() > 0 { - return fmt.Errorf("missing APIService: %v", pendingServiceNames.List()) - } - return nil - }) -} - // The proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers // is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed. // This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated. @@ -287,37 +55,6 @@ var apiVersionPriorities = merge(controlplaneapiserver.DefaultGenericAPIServiceP // Version can be set to 9 (to have space around) for a new group. }) -func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService { - apiServices := []*v1.APIService{} - - for _, curr := range delegateAPIServer.ListedPaths() { - if curr == "/api/v1" { - apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"}) - registration.AddAPIServiceToSyncOnStart(apiService) - apiServices = append(apiServices, apiService) - continue - } - - if !strings.HasPrefix(curr, "/apis/") { - continue - } - // this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1 - tokens := strings.Split(curr, "/") - if len(tokens) != 4 { - continue - } - - apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]}) - if apiService == nil { - continue - } - registration.AddAPIServiceToSyncOnStart(apiService) - apiServices = append(apiServices, apiService) - } - - return apiServices -} - func merge(a, b map[schema.GroupVersion]controlplaneapiserver.APIServicePriority) map[schema.GroupVersion]controlplaneapiserver.APIServicePriority { for k, v := range b { a[k] = v diff --git a/cmd/kube-apiserver/app/config.go b/cmd/kube-apiserver/app/config.go index 9f6874f067b89..7f03d42f9d8e1 100644 --- a/cmd/kube-apiserver/app/config.go +++ b/cmd/kube-apiserver/app/config.go @@ -99,7 +99,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } c.ApiExtensions = apiExtensions - aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer) + aggregator, err := controlplaneapiserver.CreateAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index ceb4401c6202d..81f826ac876ee 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -173,7 +173,7 @@ func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregat } // aggregator comes last in the chain - aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled) + aggregatorServer, err := controlplaneapiserver.CreateAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdAPIEnabled, apiVersionPriorities) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return nil, err diff --git a/pkg/controlplane/apiserver/aggregator.go b/pkg/controlplane/apiserver/aggregator.go index 2100cbd4adbf5..61acedb4b734d 100644 --- a/pkg/controlplane/apiserver/aggregator.go +++ b/pkg/controlplane/apiserver/aggregator.go @@ -17,9 +17,241 @@ limitations under the License. package apiserver import ( + "fmt" + "net/http" + "strings" + "sync" + + apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + genericfeatures "k8s.io/apiserver/pkg/features" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/healthz" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" + kubeexternalinformers "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" + "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" + aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" + aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" + apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" + informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/controllers/autoregister" + + "k8s.io/kubernetes/pkg/controlplane/apiserver/options" + "k8s.io/kubernetes/pkg/controlplane/controller/crdregistration" ) +func CreateAggregatorConfig( + kubeAPIServerConfig genericapiserver.Config, + commandOptions options.CompletedOptions, + externalInformers kubeexternalinformers.SharedInformerFactory, + serviceResolver aggregatorapiserver.ServiceResolver, + proxyTransport *http.Transport, + peerProxy utilpeerproxy.Interface, + pluginInitializers []admission.PluginInitializer, +) (*aggregatorapiserver.Config, error) { + // make a shallow copy to let us twiddle a few things + // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator + genericConfig := kubeAPIServerConfig + genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + genericConfig.RESTOptionsGetter = nil + // prevent generic API server from installing the OpenAPI handler. Aggregator server + // has its own customized OpenAPI handler. + genericConfig.SkipOpenAPIInstallation = true + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { + // Add StorageVersionPrecondition handler to aggregator-apiserver. + // The handler will block write requests to built-in resources until the + // target resources' storage versions are up-to-date. + genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition + } + + if peerProxy != nil { + originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc + genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler { + // Add peer proxy handler to aggregator-apiserver. + // wrap the peer proxy handler first. + apiHandler = peerProxy.WrapHandler(apiHandler) + return originalHandlerChainBuilder(apiHandler, c) + } + } + + // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. + etcdOptions := *commandOptions.Etcd + etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion) + etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { + return nil, err + } + + // override MergedResourceConfig with aggregator defaults and registry + if err := commandOptions.APIEnablement.ApplyTo( + &genericConfig, + aggregatorapiserver.DefaultAPIResourceConfigSource(), + aggregatorscheme.Scheme); err != nil { + return nil, err + } + + aggregatorConfig := &aggregatorapiserver.Config{ + GenericConfig: &genericapiserver.RecommendedConfig{ + Config: genericConfig, + SharedInformerFactory: externalInformers, + }, + ExtraConfig: aggregatorapiserver.ExtraConfig{ + ProxyClientCertFile: commandOptions.ProxyClientCertFile, + ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, + PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress, + ServiceResolver: serviceResolver, + ProxyTransport: proxyTransport, + RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects, + }, + } + + // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails) + aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + + return aggregatorConfig, nil +} + +func CreateAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, crds apiextensionsinformers.CustomResourceDefinitionInformer, crdAPIEnabled bool, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) (*aggregatorapiserver.APIAggregator, error) { + aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer) + if err != nil { + return nil, err + } + + // create controllers for auto-registration + apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig) + if err != nil { + return nil, err + } + autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient) + apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController, apiVersionPriorities) + + type controller interface { + Run(workers int, stopCh <-chan struct{}) + WaitForInitialSync() + } + var crdRegistrationController controller + if crdAPIEnabled { + crdRegistrationController = crdregistration.NewCRDRegistrationController( + crds, + autoRegistrationController) + } + + // Imbue all builtin group-priorities onto the aggregated discovery + if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { + for gv, entry := range apiVersionPriorities { + aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version)) + } + } + + err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { + if crdAPIEnabled { + go crdRegistrationController.Run(5, context.Done()) + } + go func() { + // let the CRD controller process the initial set of CRDs before starting the autoregistration controller. + // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist. + // we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery. + if crdAPIEnabled { + klog.Infof("waiting for initial CRD sync...") + crdRegistrationController.WaitForInitialSync() + klog.Infof("initial CRD sync complete...") + } else { + klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync") + } + autoRegistrationController.Run(5, context.Done()) + }() + return nil + }) + if err != nil { + return nil, err + } + + err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks( + makeAPIServiceAvailableHealthCheck( + "autoregister-completion", + apiServices, + aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), + ), + ) + if err != nil { + return nil, err + } + + return aggregatorServer, nil +} + +func makeAPIService(gv schema.GroupVersion, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) *v1.APIService { + apiServicePriority, ok := apiVersionPriorities[gv] + if !ok { + // if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version + // being permanently stuck in the APIServices list. + klog.Infof("Skipping APIService creation for %v", gv) + return nil + } + return &v1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group}, + Spec: v1.APIServiceSpec{ + Group: gv.Group, + Version: gv.Version, + GroupPriorityMinimum: apiServicePriority.Group, + VersionPriority: apiServicePriority.Version, + }, + } +} + +// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy +// once all of the specified services have been observed to be available at least once. +func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker { + // Track the auto-registered API services that have not been observed to be available yet + pendingServiceNamesLock := &sync.RWMutex{} + pendingServiceNames := sets.NewString() + for _, service := range apiServices { + pendingServiceNames.Insert(service.Name) + } + + // When an APIService in the list is seen as available, remove it from the pending list + handleAPIServiceChange := func(service *v1.APIService) { + pendingServiceNamesLock.Lock() + defer pendingServiceNamesLock.Unlock() + if !pendingServiceNames.Has(service.Name) { + return + } + if v1helper.IsAPIServiceConditionTrue(service, v1.Available) { + pendingServiceNames.Delete(service.Name) + } + } + + // Watch add/update events for APIServices + apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ //nolint:errcheck // no way to return error + AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) }, + UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) }, + }) + + // Don't return healthy until the pending list is empty + return healthz.NamedCheck(name, func(r *http.Request) error { + pendingServiceNamesLock.RLock() + defer pendingServiceNamesLock.RUnlock() + if pendingServiceNames.Len() > 0 { + return fmt.Errorf("missing APIService: %v", pendingServiceNames.List()) + } + return nil + }) +} + // APIServicePriority defines group priority that is used in discovery. This controls // group position in the kubectl output. type APIServicePriority struct { @@ -67,3 +299,34 @@ func DefaultGenericAPIServicePriorities() map[schema.GroupVersion]APIServicePrio // Version can be set to 9 (to have space around) for a new group. } } + +func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) []*v1.APIService { + apiServices := []*v1.APIService{} + + for _, curr := range delegateAPIServer.ListedPaths() { + if curr == "/api/v1" { + apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"}, apiVersionPriorities) + registration.AddAPIServiceToSyncOnStart(apiService) + apiServices = append(apiServices, apiService) + continue + } + + if !strings.HasPrefix(curr, "/apis/") { + continue + } + // this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1 + tokens := strings.Split(curr, "/") + if len(tokens) != 4 { + continue + } + + apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]}, apiVersionPriorities) + if apiService == nil { + continue + } + registration.AddAPIServiceToSyncOnStart(apiService) + apiServices = append(apiServices, apiService) + } + + return apiServices +} diff --git a/pkg/controlplane/apiserver/apiextensions.go b/pkg/controlplane/apiserver/apiextensions.go index 2b4fb0ade0b27..d203d2757e33d 100644 --- a/pkg/controlplane/apiserver/apiextensions.go +++ b/pkg/controlplane/apiserver/apiextensions.go @@ -28,14 +28,14 @@ import ( "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/informers" - controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options" + "k8s.io/kubernetes/pkg/controlplane/apiserver/options" ) func CreateAPIExtensionsConfig( kubeAPIServerConfig server.Config, kubeInformers informers.SharedInformerFactory, pluginInitializers []admission.PluginInitializer, - commandOptions controlplaneapiserver.CompletedOptions, + commandOptions options.CompletedOptions, masterCount int, serviceResolver webhook.ServiceResolver, authResolverWrapper webhook.AuthenticationInfoResolverWrapper, diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 924a3f4685d10..127edc55e06bd 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -52,7 +52,7 @@ import ( "k8s.io/kubernetes/pkg/api/legacyscheme" controlplaneadmission "k8s.io/kubernetes/pkg/controlplane/apiserver/admission" - controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options" + "k8s.io/kubernetes/pkg/controlplane/apiserver/options" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubeapiserver" @@ -107,7 +107,7 @@ type Extra struct { // the genericapiserver.Config associated with it. The genericapiserver.Config is // often shared between multiple delegated apiservers. func BuildGenericConfig( - s controlplaneapiserver.CompletedOptions, + s options.CompletedOptions, schemes []*runtime.Scheme, resourceConfig *serverstorage.ResourceConfig, getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition, @@ -233,7 +233,7 @@ func BuildGenericConfig( } // BuildAuthorizer constructs the authorizer. If authorization is not set in s, it returns nil, nil, false, nil -func BuildAuthorizer(ctx context.Context, s controlplaneapiserver.CompletedOptions, egressSelector *egressselector.EgressSelector, apiserverID string, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, bool, error) { +func BuildAuthorizer(ctx context.Context, s options.CompletedOptions, egressSelector *egressselector.EgressSelector, apiserverID string, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, bool, error) { authorizationConfig, err := s.Authorization.ToAuthorizationConfig(versionedInformers) if err != nil { return nil, nil, false, err @@ -266,7 +266,7 @@ func BuildAuthorizer(ctx context.Context, s controlplaneapiserver.CompletedOptio // CreateConfig takes the generic controlplane apiserver options and // creates a config for the generic Kube APIs out of it. func CreateConfig( - opts controlplaneapiserver.CompletedOptions, + opts options.CompletedOptions, genericConfig *genericapiserver.Config, versionedInformers clientgoinformers.SharedInformerFactory, storageFactory *serverstorage.DefaultStorageFactory,