forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request kubernetes#1312 from eggfoobar/wrk-prt-multi-node
CNF-5901: admission hook change for workload partition on all clusters
- Loading branch information
Showing
23 changed files
with
1,336 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
openshift-kube-apiserver/admission/autoscaling/managednode/admission.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package managednode | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"strings" | ||
|
||
configv1 "github.com/openshift/api/config/v1" | ||
configv1informer "github.com/openshift/client-go/config/informers/externalversions/config/v1" | ||
configv1listers "github.com/openshift/client-go/config/listers/config/v1" | ||
|
||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"k8s.io/apiserver/pkg/admission" | ||
"k8s.io/apiserver/pkg/admission/initializer" | ||
"k8s.io/kubernetes/pkg/kubelet/managed" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
coreapi "k8s.io/kubernetes/pkg/apis/core" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
) | ||
|
||
const ( | ||
PluginName = "autoscaling.openshift.io/ManagedNode" | ||
// infraClusterName contains the name of the cluster infrastructure resource | ||
infraClusterName = "cluster" | ||
) | ||
|
||
var _ = initializer.WantsExternalKubeClientSet(&managedNodeValidate{}) | ||
var _ = admission.ValidationInterface(&managedNodeValidate{}) | ||
var _ = WantsInfraInformer(&managedNodeValidate{}) | ||
|
||
func Register(plugins *admission.Plugins) { | ||
plugins.Register(PluginName, | ||
func(_ io.Reader) (admission.Interface, error) { | ||
return &managedNodeValidate{ | ||
Handler: admission.NewHandler(admission.Create, admission.Update), | ||
}, nil | ||
}) | ||
} | ||
|
||
type managedNodeValidate struct { | ||
*admission.Handler | ||
client kubernetes.Interface | ||
infraConfigLister configv1listers.InfrastructureLister | ||
infraConfigListSynced func() bool | ||
} | ||
|
||
// SetExternalKubeClientSet implements the WantsExternalKubeClientSet interface. | ||
func (a *managedNodeValidate) SetExternalKubeClientSet(client kubernetes.Interface) { | ||
a.client = client | ||
} | ||
|
||
func (a *managedNodeValidate) SetInfraInformer(informer configv1informer.InfrastructureInformer) { | ||
a.infraConfigLister = informer.Lister() | ||
a.infraConfigListSynced = informer.Informer().HasSynced | ||
} | ||
|
||
func (a *managedNodeValidate) ValidateInitialization() error { | ||
if a.client == nil { | ||
return fmt.Errorf("%s plugin needs a kubernetes client", PluginName) | ||
} | ||
if a.infraConfigLister == nil { | ||
return fmt.Errorf("%s did not get a config infrastructure lister", PluginName) | ||
} | ||
if a.infraConfigListSynced == nil { | ||
return fmt.Errorf("%s plugin needs a config infrastructure lister synced", PluginName) | ||
} | ||
return nil | ||
} | ||
|
||
func (a *managedNodeValidate) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) { | ||
if attr.GetResource().GroupResource() != corev1.Resource("nodes") || attr.GetSubresource() != "" { | ||
return nil | ||
} | ||
|
||
node, ok := attr.GetObject().(*coreapi.Node) | ||
if !ok { | ||
return admission.NewForbidden(attr, fmt.Errorf("unexpected object: %#v", attr.GetResource())) | ||
} | ||
|
||
// infraConfigListSynced is expected to be thread-safe since the underlying call is to the standard | ||
// informer HasSynced() function which is thread-safe. | ||
if !a.infraConfigListSynced() { | ||
return admission.NewForbidden(attr, fmt.Errorf("%s infra config cache not synchronized", PluginName)) | ||
} | ||
|
||
clusterInfra, err := a.infraConfigLister.Get(infraClusterName) | ||
if err != nil { | ||
return admission.NewForbidden(attr, err) // can happen due to informer latency | ||
} | ||
|
||
// Check if we are in CPU Partitioning mode for AllNodes | ||
allErrs := validateClusterCPUPartitioning(clusterInfra.Status, node) | ||
if len(allErrs) == 0 { | ||
return nil | ||
} | ||
return errors.NewInvalid(attr.GetKind().GroupKind(), node.Name, allErrs) | ||
} | ||
|
||
// validateClusterCPUPartitioning Make sure that we only check nodes when CPU Partitioning is turned on. | ||
// We also need to account for Single Node upgrades, during that initial upgrade, NTO will update this field during | ||
// upgrade to make it authoritative from that point on. A roll back will revert an SingleNode cluster back to it's normal cycle. | ||
// Other installations will have this field set at install time, and can not be turned off. | ||
// | ||
// If CPUPartitioning == AllNodes and is not empty value, check nodes | ||
func validateClusterCPUPartitioning(infraStatus configv1.InfrastructureStatus, node *coreapi.Node) field.ErrorList { | ||
errorMessage := "node does not contain resource information, this is required for clusters with workload partitioning enabled" | ||
var allErrs field.ErrorList | ||
|
||
if infraStatus.CPUPartitioning == configv1.CPUPartitioningAllNodes { | ||
if !containsCPUResource(node.Status.Capacity) { | ||
allErrs = append(allErrs, getNodeInvalidWorkloadResourceError("capacity", errorMessage)) | ||
} | ||
if !containsCPUResource(node.Status.Allocatable) { | ||
allErrs = append(allErrs, getNodeInvalidWorkloadResourceError("allocatable", errorMessage)) | ||
} | ||
} | ||
|
||
return allErrs | ||
} | ||
|
||
func containsCPUResource(resources coreapi.ResourceList) bool { | ||
for k := range resources { | ||
if strings.Contains(k.String(), managed.WorkloadsCapacitySuffix) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func getNodeInvalidWorkloadResourceError(resourcePool, message string) *field.Error { | ||
return field.Required(field.NewPath("status", resourcePool, managed.WorkloadsCapacitySuffix), message) | ||
} |
128 changes: 128 additions & 0 deletions
128
openshift-kube-apiserver/admission/autoscaling/managednode/admission_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package managednode | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
configv1 "github.com/openshift/api/config/v1" | ||
|
||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apiserver/pkg/admission" | ||
"k8s.io/apiserver/pkg/authentication/user" | ||
"k8s.io/client-go/kubernetes/fake" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
configv1listers "github.com/openshift/client-go/config/listers/config/v1" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
kapi "k8s.io/kubernetes/pkg/apis/core" | ||
) | ||
|
||
const ( | ||
managedCapacityLabel = "management.workload.openshift.io/cores" | ||
) | ||
|
||
func TestAdmit(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
node *corev1.Node | ||
infra *configv1.Infrastructure | ||
expectedError error | ||
}{ | ||
{ | ||
name: "should succeed when CPU partitioning is set to AllNodes", | ||
node: testNodeWithManagementResource(true), | ||
infra: testClusterInfra(configv1.CPUPartitioningAllNodes), | ||
}, | ||
{ | ||
name: "should succeed when CPU partitioning is set to None", | ||
node: testNodeWithManagementResource(true), | ||
infra: testClusterInfra(configv1.CPUPartitioningNone), | ||
}, | ||
{ | ||
name: "should fail when nodes don't have capacity", | ||
node: testNodeWithManagementResource(false), | ||
infra: testClusterInfra(configv1.CPUPartitioningAllNodes), | ||
expectedError: fmt.Errorf("node does not contain resource information, this is required for clusters with workload partitioning enabled"), | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
m, err := getMockNode(test.infra) | ||
if err != nil { | ||
t.Fatalf("%s: failed to get mock managementNode: %v", test.name, err) | ||
} | ||
|
||
attrs := admission.NewAttributesRecord( | ||
test.node, nil, schema.GroupVersionKind{}, | ||
test.node.Namespace, test.node.Name, kapi.Resource("nodes").WithVersion("version"), "", | ||
admission.Create, nil, false, fakeUser()) | ||
err = m.Validate(context.TODO(), attrs, nil) | ||
|
||
if err == nil && test.expectedError != nil { | ||
t.Fatalf("%s: the expected error %v, got nil", test.name, test.expectedError) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func testNodeWithManagementResource(capacity bool) *corev1.Node { | ||
q := resource.NewQuantity(16000, resource.DecimalSI) | ||
node := &corev1.Node{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "managed-node", | ||
}, | ||
} | ||
if capacity { | ||
node.Status.Capacity = corev1.ResourceList{ | ||
managedCapacityLabel: *q, | ||
} | ||
} | ||
return node | ||
} | ||
|
||
func testClusterInfra(mode configv1.CPUPartitioningMode) *configv1.Infrastructure { | ||
return &configv1.Infrastructure{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: infraClusterName, | ||
}, | ||
Status: configv1.InfrastructureStatus{ | ||
APIServerURL: "test", | ||
ControlPlaneTopology: configv1.HighlyAvailableTopologyMode, | ||
InfrastructureTopology: configv1.HighlyAvailableTopologyMode, | ||
CPUPartitioning: mode, | ||
}, | ||
} | ||
} | ||
|
||
func fakeUser() user.Info { | ||
return &user.DefaultInfo{ | ||
Name: "testuser", | ||
} | ||
} | ||
|
||
func getMockNode(infra *configv1.Infrastructure) (*managedNodeValidate, error) { | ||
m := &managedNodeValidate{ | ||
Handler: admission.NewHandler(admission.Create), | ||
client: &fake.Clientset{}, | ||
infraConfigLister: fakeInfraConfigLister(infra), | ||
infraConfigListSynced: func() bool { return true }, | ||
} | ||
if err := m.ValidateInitialization(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return m, nil | ||
} | ||
|
||
func fakeInfraConfigLister(infra *configv1.Infrastructure) configv1listers.InfrastructureLister { | ||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) | ||
if infra != nil { | ||
_ = indexer.Add(infra) | ||
} | ||
return configv1listers.NewInfrastructureLister(indexer) | ||
} |
28 changes: 28 additions & 0 deletions
28
openshift-kube-apiserver/admission/autoscaling/managednode/initializers.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package managednode | ||
|
||
import ( | ||
"k8s.io/apiserver/pkg/admission" | ||
|
||
configv1informer "github.com/openshift/client-go/config/informers/externalversions/config/v1" | ||
) | ||
|
||
func NewInitializer(infraInformer configv1informer.InfrastructureInformer) admission.PluginInitializer { | ||
return &localInitializer{infraInformer: infraInformer} | ||
} | ||
|
||
type WantsInfraInformer interface { | ||
SetInfraInformer(informer configv1informer.InfrastructureInformer) | ||
admission.InitializationValidator | ||
} | ||
|
||
type localInitializer struct { | ||
infraInformer configv1informer.InfrastructureInformer | ||
} | ||
|
||
// Initialize will check the initialization interfaces implemented by each plugin | ||
// and provide the appropriate initialization data | ||
func (i *localInitializer) Initialize(plugin admission.Interface) { | ||
if wants, ok := plugin.(WantsInfraInformer); ok { | ||
wants.SetInfraInformer(i.infraInformer) | ||
} | ||
} |
Oops, something went wrong.