Skip to content

Commit

Permalink
Merge pull request kubernetes#1312 from eggfoobar/wrk-prt-multi-node
Browse files Browse the repository at this point in the history
CNF-5901: admission hook change for workload partition on all clusters
  • Loading branch information
openshift-merge-robot authored Feb 23, 2023
2 parents 518f3ed + 04ff509 commit 8923264
Show file tree
Hide file tree
Showing 23 changed files with 1,336 additions and 93 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/onsi/gomega v1.23.0
github.com/opencontainers/runc v1.1.4
github.com/opencontainers/selinux v1.10.0
github.com/openshift/api v0.0.0-20230201213816-61d971884921
github.com/openshift/api v0.0.0-20230208193339-068b2ae5534f
github.com/openshift/apiserver-library-go v0.0.0-20230120221150-cefee9e0162b
github.com/openshift/client-go v0.0.0-20230120202327-72f107311084
github.com/openshift/library-go v0.0.0-20230127195720-edf819b079cf
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 h1:3
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.10.0 h1:rAiKF8hTcgLI3w0DHm6i0ylVVcOrlgR1kK99DRLDhyU=
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/openshift/api v0.0.0-20230201213816-61d971884921 h1:19XfhG/rG4oxNOQ1PNtHIjW23z2+QNHC6lH1VZWhSWY=
github.com/openshift/api v0.0.0-20230201213816-61d971884921/go.mod h1:ctXNyWanKEjGj8sss1KjjHQ3ENKFm33FFnS5BKaIPh4=
github.com/openshift/api v0.0.0-20230208193339-068b2ae5534f h1:+GaTEfR8gYzh64fdlRKLYZLwt5p4wQd2mdnvkhFDa8k=
github.com/openshift/api v0.0.0-20230208193339-068b2ae5534f/go.mod h1:ctXNyWanKEjGj8sss1KjjHQ3ENKFm33FFnS5BKaIPh4=
github.com/openshift/apiserver-library-go v0.0.0-20230120221150-cefee9e0162b h1:1AeKPWFTSSSqSl0VYmwnaOuxw2kExQgJ6pjuC4XV33A=
github.com/openshift/apiserver-library-go v0.0.0-20230120221150-cefee9e0162b/go.mod h1:FmOGJTf5L1X9LiqnsNDwKJyt5ycUNxnNqpxs0rgylTc=
github.com/openshift/client-go v0.0.0-20230120202327-72f107311084 h1:66uaqNwA+qYyQDwsMWUfjjau8ezmg1dzCqub13KZOcE=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/openshift/apiserver-library-go/pkg/securitycontextconstraints/sccadmission"
authorizationrestrictusers "k8s.io/kubernetes/openshift-kube-apiserver/admission/authorization/restrictusers"
quotaclusterresourceoverride "k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/clusterresourceoverride"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/managednode"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/managementcpusoverride"
quotarunonceduration "k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/runonceduration"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/customresourcevalidation/customresourcevalidationregistration"
Expand All @@ -30,6 +31,7 @@ func RegisterOpenshiftKubeAdmissionPlugins(plugins *admission.Plugins) {
imagepolicy.Register(plugins)
ingressadmission.Register(plugins)
managementcpusoverride.Register(plugins)
managednode.Register(plugins)
projectnodeenv.Register(plugins)
quotaclusterresourceoverride.Register(plugins)
quotaclusterresourcequota.Register(plugins)
Expand Down Expand Up @@ -71,6 +73,7 @@ var (
"route.openshift.io/IngressAdmission",
hostassignment.PluginName, // "route.openshift.io/RouteHostAssignment"
csiinlinevolumesecurity.PluginName, // "storage.openshift.io/CSIInlineVolumeSecurity"
managednode.PluginName, // "autoscaling.openshift.io/ManagedNode"
}

// openshiftAdmissionPluginsForKubeAfterResourceQuota are the plugins to add after ResourceQuota plugin
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package managednode

import (
"context"
"fmt"
"io"
"strings"

configv1 "github.com/openshift/api/config/v1"
configv1informer "github.com/openshift/client-go/config/informers/externalversions/config/v1"
configv1listers "github.com/openshift/client-go/config/listers/config/v1"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/kubernetes/pkg/kubelet/managed"

corev1 "k8s.io/api/core/v1"
coreapi "k8s.io/kubernetes/pkg/apis/core"

"k8s.io/client-go/kubernetes"
)

const (
PluginName = "autoscaling.openshift.io/ManagedNode"
// infraClusterName contains the name of the cluster infrastructure resource
infraClusterName = "cluster"
)

var _ = initializer.WantsExternalKubeClientSet(&managedNodeValidate{})
var _ = admission.ValidationInterface(&managedNodeValidate{})
var _ = WantsInfraInformer(&managedNodeValidate{})

func Register(plugins *admission.Plugins) {
plugins.Register(PluginName,
func(_ io.Reader) (admission.Interface, error) {
return &managedNodeValidate{
Handler: admission.NewHandler(admission.Create, admission.Update),
}, nil
})
}

type managedNodeValidate struct {
*admission.Handler
client kubernetes.Interface
infraConfigLister configv1listers.InfrastructureLister
infraConfigListSynced func() bool
}

// SetExternalKubeClientSet implements the WantsExternalKubeClientSet interface.
func (a *managedNodeValidate) SetExternalKubeClientSet(client kubernetes.Interface) {
a.client = client
}

func (a *managedNodeValidate) SetInfraInformer(informer configv1informer.InfrastructureInformer) {
a.infraConfigLister = informer.Lister()
a.infraConfigListSynced = informer.Informer().HasSynced
}

func (a *managedNodeValidate) ValidateInitialization() error {
if a.client == nil {
return fmt.Errorf("%s plugin needs a kubernetes client", PluginName)
}
if a.infraConfigLister == nil {
return fmt.Errorf("%s did not get a config infrastructure lister", PluginName)
}
if a.infraConfigListSynced == nil {
return fmt.Errorf("%s plugin needs a config infrastructure lister synced", PluginName)
}
return nil
}

func (a *managedNodeValidate) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
if attr.GetResource().GroupResource() != corev1.Resource("nodes") || attr.GetSubresource() != "" {
return nil
}

node, ok := attr.GetObject().(*coreapi.Node)
if !ok {
return admission.NewForbidden(attr, fmt.Errorf("unexpected object: %#v", attr.GetResource()))
}

// infraConfigListSynced is expected to be thread-safe since the underlying call is to the standard
// informer HasSynced() function which is thread-safe.
if !a.infraConfigListSynced() {
return admission.NewForbidden(attr, fmt.Errorf("%s infra config cache not synchronized", PluginName))
}

clusterInfra, err := a.infraConfigLister.Get(infraClusterName)
if err != nil {
return admission.NewForbidden(attr, err) // can happen due to informer latency
}

// Check if we are in CPU Partitioning mode for AllNodes
allErrs := validateClusterCPUPartitioning(clusterInfra.Status, node)
if len(allErrs) == 0 {
return nil
}
return errors.NewInvalid(attr.GetKind().GroupKind(), node.Name, allErrs)
}

// validateClusterCPUPartitioning Make sure that we only check nodes when CPU Partitioning is turned on.
// We also need to account for Single Node upgrades, during that initial upgrade, NTO will update this field during
// upgrade to make it authoritative from that point on. A roll back will revert an SingleNode cluster back to it's normal cycle.
// Other installations will have this field set at install time, and can not be turned off.
//
// If CPUPartitioning == AllNodes and is not empty value, check nodes
func validateClusterCPUPartitioning(infraStatus configv1.InfrastructureStatus, node *coreapi.Node) field.ErrorList {
errorMessage := "node does not contain resource information, this is required for clusters with workload partitioning enabled"
var allErrs field.ErrorList

if infraStatus.CPUPartitioning == configv1.CPUPartitioningAllNodes {
if !containsCPUResource(node.Status.Capacity) {
allErrs = append(allErrs, getNodeInvalidWorkloadResourceError("capacity", errorMessage))
}
if !containsCPUResource(node.Status.Allocatable) {
allErrs = append(allErrs, getNodeInvalidWorkloadResourceError("allocatable", errorMessage))
}
}

return allErrs
}

func containsCPUResource(resources coreapi.ResourceList) bool {
for k := range resources {
if strings.Contains(k.String(), managed.WorkloadsCapacitySuffix) {
return true
}
}
return false
}

func getNodeInvalidWorkloadResourceError(resourcePool, message string) *field.Error {
return field.Required(field.NewPath("status", resourcePool, managed.WorkloadsCapacitySuffix), message)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package managednode

import (
"context"
"fmt"
"testing"

configv1 "github.com/openshift/api/config/v1"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

configv1listers "github.com/openshift/client-go/config/listers/config/v1"

corev1 "k8s.io/api/core/v1"
kapi "k8s.io/kubernetes/pkg/apis/core"
)

const (
managedCapacityLabel = "management.workload.openshift.io/cores"
)

func TestAdmit(t *testing.T) {
tests := []struct {
name string
node *corev1.Node
infra *configv1.Infrastructure
expectedError error
}{
{
name: "should succeed when CPU partitioning is set to AllNodes",
node: testNodeWithManagementResource(true),
infra: testClusterInfra(configv1.CPUPartitioningAllNodes),
},
{
name: "should succeed when CPU partitioning is set to None",
node: testNodeWithManagementResource(true),
infra: testClusterInfra(configv1.CPUPartitioningNone),
},
{
name: "should fail when nodes don't have capacity",
node: testNodeWithManagementResource(false),
infra: testClusterInfra(configv1.CPUPartitioningAllNodes),
expectedError: fmt.Errorf("node does not contain resource information, this is required for clusters with workload partitioning enabled"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
m, err := getMockNode(test.infra)
if err != nil {
t.Fatalf("%s: failed to get mock managementNode: %v", test.name, err)
}

attrs := admission.NewAttributesRecord(
test.node, nil, schema.GroupVersionKind{},
test.node.Namespace, test.node.Name, kapi.Resource("nodes").WithVersion("version"), "",
admission.Create, nil, false, fakeUser())
err = m.Validate(context.TODO(), attrs, nil)

if err == nil && test.expectedError != nil {
t.Fatalf("%s: the expected error %v, got nil", test.name, test.expectedError)
}
})
}
}

func testNodeWithManagementResource(capacity bool) *corev1.Node {
q := resource.NewQuantity(16000, resource.DecimalSI)
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "managed-node",
},
}
if capacity {
node.Status.Capacity = corev1.ResourceList{
managedCapacityLabel: *q,
}
}
return node
}

func testClusterInfra(mode configv1.CPUPartitioningMode) *configv1.Infrastructure {
return &configv1.Infrastructure{
ObjectMeta: metav1.ObjectMeta{
Name: infraClusterName,
},
Status: configv1.InfrastructureStatus{
APIServerURL: "test",
ControlPlaneTopology: configv1.HighlyAvailableTopologyMode,
InfrastructureTopology: configv1.HighlyAvailableTopologyMode,
CPUPartitioning: mode,
},
}
}

func fakeUser() user.Info {
return &user.DefaultInfo{
Name: "testuser",
}
}

func getMockNode(infra *configv1.Infrastructure) (*managedNodeValidate, error) {
m := &managedNodeValidate{
Handler: admission.NewHandler(admission.Create),
client: &fake.Clientset{},
infraConfigLister: fakeInfraConfigLister(infra),
infraConfigListSynced: func() bool { return true },
}
if err := m.ValidateInitialization(); err != nil {
return nil, err
}

return m, nil
}

func fakeInfraConfigLister(infra *configv1.Infrastructure) configv1listers.InfrastructureLister {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if infra != nil {
_ = indexer.Add(infra)
}
return configv1listers.NewInfrastructureLister(indexer)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package managednode

import (
"k8s.io/apiserver/pkg/admission"

configv1informer "github.com/openshift/client-go/config/informers/externalversions/config/v1"
)

func NewInitializer(infraInformer configv1informer.InfrastructureInformer) admission.PluginInitializer {
return &localInitializer{infraInformer: infraInformer}
}

type WantsInfraInformer interface {
SetInfraInformer(informer configv1informer.InfrastructureInformer)
admission.InitializationValidator
}

type localInitializer struct {
infraInformer configv1informer.InfrastructureInformer
}

// Initialize will check the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *localInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsInfraInformer); ok {
wants.SetInfraInformer(i.infraInformer)
}
}
Loading

0 comments on commit 8923264

Please sign in to comment.