Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not clear state of pods pending admission for CPU/Memory/Device manager #103979

Merged
merged 3 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type manager struct {

// allocatableCPUs is the set of online CPUs as reported by the system
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -236,6 +239,10 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
}

func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(p)

// Garbage collect any stranded resources before allocating CPUs.
m.removeStaleState()

Expand Down Expand Up @@ -304,13 +311,19 @@ func (m *manager) State() state.Reader {
}

func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}

func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand Down Expand Up @@ -343,11 +356,14 @@ func (m *manager) removeStaleState() {
defer m.Unlock()

// Get the list of active pods.
activePods := m.activePods()
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is only partially correct. You need to prune the admitted pods map to only include those that are still in either config or pod worker but not in both (a pod that is in the pod worker is by definition admitted). You also want to exclude pods that are no longer running due to termination (which means we will never start the container again, thus we can reuse those CPUs). You then need to take that pruned list of admitted pods and add it to active pods to get the list of "pods that should be considered to be allocated guaranteed resources"

I think we'll need to add a method on kubelet to make this easier - looking at what that would be right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, I think we have a bug in that these components need to know about pods that are terminating that have been force deleted (which activePods() does not include). @Random-Liu I think that components like the cpu_manager, memory_manager, qos_container_manager, and device_manager actually need to know pods that have been force deleted but are still running. To do that, I'm going to need to enable the pod worker to share that info safely (and consistently, i think).

Copy link
Contributor

@klueska klueska Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you suggest here is necessary. As you mentioned in your comment here, all we should really need to care about is whether the pod is:

(1) in the active pods list; or
(2) some pod currently waiting to be admitted

Assuming logic exists to atomically add an admitted pod to the active pod list before starting the next iteration of the admission loop, I believe this should be sufficient. If it doesn't make it into the active list, then we shouldn't be tracking it anyway, and if it's ever removed from the list, we should be done tracking it.

We just then need to track a single variable to hold the "currently being admitting" pod that gets overwritten each time a new iteration of the admit loop starts.

The check in removeStaleState() would then remove any state not currently associated with a pod in the active list + this new variable.

Of course this is all contingent on what I said before:

Assuming logic exists to atomically add an admitted pod to the active pod list before starting the next iteration of the admission loop, I believe this should be sufficient.

Maybe there's good reason not to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) in the active pods list; or

The bug is that the active pods list does not include force deleted pods that may take tens of minutes to complete that may still have running containers that are pinned to certain CPUs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a problem though? I would think we can safely remove the CPUs assigned to them then, and the next time around the reconcile loop, their containers will be moved onto non-exclusive CPUs.

Copy link
Contributor

@ffromani ffromani Aug 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I guess if activePods() used to include terminated (but still running) pods before, then we never would have removed stale state about them here, making us „think“ their CPUs were free (even though containers are still running on them).

AFAIU/CT this is the most pressing question now. I fully agree about the general idea to restore the old (and, we learned, not covering some important cases) behaviour and iterating later to actually cover them. The former is important and urgent, the later important as well but less urgent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivePods() never returned pods that were force deleted. So you have always been broken on that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, can we restore at least the old behavior under the current PR, and once we will have the infrastructure to get force deleted pods we can improve the logic under resource managers.

Copy link
Author

@cynepco3hahue cynepco3hahue Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened an issue to monitor force deleted pods problem - #104099

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this does not block this change, and probably the fix for 104099 should make the code changes here simpler (but we don't have to do it now).


// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -493,3 +509,10 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return m.state.GetCPUSetOrDefault(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestCPUManagerAdd(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
mgr.activePods = func() []*v1.Pod { return nil }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
mgr.activePods = func() []*v1.Pod { return nil }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/cm/cpumanager/topology_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func TestGetTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
}

if m.pendingAdmissionPod == nil {
t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()")
}

sort.SliceStable(hints, func(i, j int) bool {
return hints[i].LessThan(hints[j])
})
Expand Down Expand Up @@ -236,6 +241,7 @@ func TestGetPodTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
}

sort.SliceStable(podHints, func(i, j int) bool {
return podHints[i].LessThan(podHints[j])
})
Expand Down
24 changes: 22 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type ManagerImpl struct {
// devicesToReuse contains devices that can be reused as they have been allocated to
// init containers.
devicesToReuse PodReusableDevices

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

type endpointInfo struct {
Expand Down Expand Up @@ -367,6 +370,10 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
Expand Down Expand Up @@ -619,14 +626,20 @@ func (m *ManagerImpl) readCheckpoint() error {

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
if !m.sourcesReady.AllReady() {
return
}

m.mutex.Lock()
defer m.mutex.Unlock()

activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

podsToBeRemoved := m.podDevices.pods()
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
podsToBeRemoved.Delete(string(pod.UID))
}
if len(podsToBeRemoved) <= 0 {
Expand Down Expand Up @@ -1117,3 +1130,10 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
}
return false
}

func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.pendingAdmissionPod = pod
}
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/devicemanager/topology_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// ensures the Device Manager is consulted when Topology Aware Hints for each
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down Expand Up @@ -83,6 +87,10 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down
31 changes: 28 additions & 3 deletions pkg/kubelet/cm/memorymanager/memory_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type manager struct {

// allocatableMemory holds the allocatable memory for each NUMA node
allocatableMemory []state.Block

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -230,6 +233,10 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.

// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before allocation
m.removeStaleState()

Expand Down Expand Up @@ -268,6 +275,10 @@ func (m *manager) State() state.Reader {

// GetPodTopologyHints returns the topology hints for the topology manager
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -276,6 +287,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.

// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -298,12 +313,15 @@ func (m *manager) removeStaleState() {
m.Lock()
defer m.Unlock()

// Get the list of active pods.
activePods := m.activePods()
// Get the list of admitted and active pods.
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -430,3 +448,10 @@ func (m *manager) GetAllocatableMemory() []state.Block {
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
return m.state.GetMemoryBlocks(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}
Loading