Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to container manager and internal container lifecycle to accommodate TopologyManager #74357

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
},
s.FailSwapOn,
devicePluginEnabled,
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/cm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
Expand Down Expand Up @@ -108,6 +109,9 @@ type ContainerManager interface {
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool

// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
GetTopologyPodAdmitHandler() topologymanager.Manager
}

type NodeConfig struct {
Expand All @@ -127,6 +131,7 @@ type NodeConfig struct {
ExperimentalPodPidsLimit int64
EnforceCPULimits bool
CPUCFSQuotaPeriod time.Duration
ExperimentalTopologyManagerPolicy string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we will plan to move this out of experimental in the future, is it better if we just call this TopologyManagerPolicy to avoid a future rename?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

type NodeAllocatableConfig struct {
Expand Down
23 changes: 22 additions & 1 deletion pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -139,6 +140,8 @@ type containerManagerImpl struct {
deviceManager devicemanager.Manager
// Interface for CPU affinity management.
cpuManager cpumanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
}

type features struct {
Expand Down Expand Up @@ -284,6 +287,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
qosContainerManager: qosContainerManager,
}

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
cm.topologyManager, err = topologymanager.NewManager(
nodeConfig.ExperimentalTopologyManagerPolicy,
)

if err != nil {
return nil, err
}

klog.Infof("[topologymanager] Initilizing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy)
lmdaly marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Initializing"

} else {
cm.topologyManager = topologymanager.NewFakeManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other OSes, we return a nil topologyManager. I don't think we want to use a fake manager outside of test code right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@lmdaly lmdaly Jul 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see, the other OSes don't have TopologyManager(or device/cpu manager) as part of the containerManagerImpl so there is no reference, so we don't return anything?

I was following the convention used by DeviceManager which returns a ManagerStub below. There is also a reference to the Fake CPU Manager in the other OSes when returning the internalContainerLifecycleImpl.

If I return nil, the code errors as cm.TopologyManager functions are called below. So I could wrap those calls in a check for the feature gate but looks like the fake/stub managers are used outside of test code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think the only call you would need to wrap is this one (which probably makes sense to wrap anyway):

pkg/kubelet/kubelet.go:
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())

Looking through the code, I don't see anywhere else that cm.topologyManager is referenced (other than https://github.com/kubernetes/kubernetes/pull/74357/files#diff-06aa1b61a9bc4a50aa8d8b0a69d95bb2R347), whose internal references to the topologyManager are already protected by a check on the feature gate:

https://github.com/kubernetes/kubernetes/pull/74357/files#diff-81cef540f416b0451c18470420adc42dR40

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes I see the code I was referring to is actually in the CPU and Device Manager PRs.
#73920 & #74345 have the references to topology manager to register them as Hint Providers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still feels more natural to me to remove the FakeManager() here and protect all of the accesses to cm.topologymanager with a check on the feature gate, but I will leave it to @sjennings for the final call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can clean it up in a follow on. there are a number of things that can be cleaned up. i'm making a list :)

}

klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl()
Expand Down Expand Up @@ -332,7 +349,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
}

func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager}
return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager}
}

// Create a cgroup container manager.
Expand Down Expand Up @@ -644,6 +661,10 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.No
return cm.deviceManager.Allocate(node, attrs)
}

func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return cm.topologyManager
}

func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
cpuLimit := int64(0)

Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulernodeinfo.NodeInf
}

func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
}

func (cm *containerManagerStub) GetPodCgroupRoot() string {
Expand All @@ -116,6 +117,10 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}

func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
}

func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(*schedulernodeinfo.NodeInf
}

func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
}

func (cm *containerManagerImpl) GetPodCgroupRoot() string {
Expand All @@ -175,3 +176,7 @@ func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.Conta
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false
}

func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
}
26 changes: 23 additions & 3 deletions pkg/kubelet/cm/internal_container_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)

type InternalContainerLifecycle interface {
Expand All @@ -32,12 +33,22 @@ type InternalContainerLifecycle interface {

// Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
cpuManager cpumanager.Manager
topologyManager topologymanager.Manager
}

func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.AddContainer(pod, container, containerID)
err := i.cpuManager.AddContainer(pod, container, containerID)
if err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.AddContainer(pod, containerID)
if err != nil {
return err
}
}
return nil
}
Expand All @@ -51,7 +62,16 @@ func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) er

func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
err := i.cpuManager.RemoveContainer(containerID)
if err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.RemoveContainer(containerID)
if err != nil {
return err
}
}
return nil
}
4 changes: 3 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
}
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's
Expand Down