diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 1f357a4ad90d1..f7c9599e28df0 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -693,6 +693,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan ExperimentalPodPidsLimit: s.PodPidsLimit, EnforceCPULimits: s.CPUCFSQuota, CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration, + ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy, }, s.FailSwapOn, devicePluginEnabled, diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 4ac55b4b3025c..46f1c8fccd45d 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -28,6 +28,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 1d5231543c887..40ed8bd21ec6e 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -108,6 +109,9 @@ type ContainerManager interface { // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. ShouldResetExtendedResourceCapacity() bool + + // GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission + GetTopologyPodAdmitHandler() topologymanager.Manager } type NodeConfig struct { @@ -127,6 +131,7 @@ type NodeConfig struct { ExperimentalPodPidsLimit int64 EnforceCPULimits bool CPUCFSQuotaPeriod time.Duration + ExperimentalTopologyManagerPolicy string } type NodeAllocatableConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index aa4cc8ac5ac5b..ed16b0e032ee7 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -139,6 +140,8 @@ type containerManagerImpl struct { deviceManager devicemanager.Manager // Interface for CPU affinity management. cpuManager cpumanager.Manager + // Interface for Topology resource co-ordination + topologyManager topologymanager.Manager } type features struct { @@ -284,6 +287,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I qosContainerManager: qosContainerManager, } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { + cm.topologyManager, err = topologymanager.NewManager( + nodeConfig.ExperimentalTopologyManagerPolicy, + ) + + if err != nil { + return nil, err + } + + klog.Infof("[topologymanager] Initilizing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy) + } else { + cm.topologyManager = topologymanager.NewFakeManager() + } + klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) if devicePluginEnabled { cm.deviceManager, err = devicemanager.NewManagerImpl() @@ -332,7 +349,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cm.cpuManager} + return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager} } // Create a cgroup container manager. @@ -644,6 +661,10 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.No return cm.deviceManager.Allocate(node, attrs) } +func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { + return cm.topologyManager +} + func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { cpuLimit := int64(0) diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index b63d58cf8be57..7d6a214f4b74b 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -24,6 +24,7 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -101,7 +102,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulernodeinfo.NodeInf } func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} + return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()} } func (cm *containerManagerStub) GetPodCgroupRoot() string { @@ -116,6 +117,10 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool { return cm.shouldResetExtendedResourceCapacity } +func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 352ab1062f64a..55f172171e6dc 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -34,6 +34,7 @@ import ( podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -161,7 +162,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(*schedulernodeinfo.NodeInf } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} + return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()} } func (cm *containerManagerImpl) GetPodCgroupRoot() string { @@ -175,3 +176,7 @@ func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.Conta func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { return false } + +func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go index f81cb45a80511..1176a42a8d170 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle.go +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -22,6 +22,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type InternalContainerLifecycle interface { @@ -32,12 +33,22 @@ type InternalContainerLifecycle interface { // Implements InternalContainerLifecycle interface. type internalContainerLifecycleImpl struct { - cpuManager cpumanager.Manager + cpuManager cpumanager.Manager + topologyManager topologymanager.Manager } func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { - return i.cpuManager.AddContainer(pod, container, containerID) + err := i.cpuManager.AddContainer(pod, container, containerID) + if err != nil { + return err + } + } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { + err := i.topologyManager.AddContainer(pod, containerID) + if err != nil { + return err + } } return nil } @@ -51,7 +62,16 @@ func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) er func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { - return i.cpuManager.RemoveContainer(containerID) + err := i.cpuManager.RemoveContainer(containerID) + if err != nil { + return err + } + } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { + err := i.topologyManager.RemoveContainer(containerID) + if err != nil { + return err + } } return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ccc924efd78f0..1cf99c6e0596f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -859,7 +859,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler) - + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) { + klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler()) + } criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) // apply functional Option's