Skip to content

Commit

Permalink
feat: revert kubernetes#103979 for it's duplicated
Browse files Browse the repository at this point in the history
Signed-off-by: likakuli <1154584512@qq.com>
  • Loading branch information
likakuli committed Oct 8, 2024
1 parent b2031b3 commit 091fcfa
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 217 deletions.
27 changes: 2 additions & 25 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ type manager struct {

// allocatableCPUs is the set of online CPUs as reported by the system
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -244,10 +241,6 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
}

func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(p)

// Garbage collect any stranded resources before allocating CPUs.
m.removeStaleState()

Expand Down Expand Up @@ -316,19 +309,13 @@ func (m *manager) State() state.Reader {
}

func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}

func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand Down Expand Up @@ -361,14 +348,11 @@ func (m *manager) removeStaleState() {
defer m.Unlock()

// Get the list of active pods.
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}
activePods := m.activePods()

// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activeAndAdmittedPods {
for _, pod := range activePods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -540,10 +524,3 @@ func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet {
return m.state.GetCPUSetOrDefault(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestCPUManagerAdd(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return nil }
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return nil }
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down
6 changes: 0 additions & 6 deletions pkg/kubelet/cm/cpumanager/topology_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,6 @@ func TestGetTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
}

if m.pendingAdmissionPod == nil {
t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()")
}

sort.SliceStable(hints, func(i, j int) bool {
return hints[i].LessThan(hints[j])
})
Expand Down Expand Up @@ -298,7 +293,6 @@ func TestGetPodTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
}

sort.SliceStable(podHints, func(i, j int) bool {
return podHints[i].LessThan(podHints[j])
})
Expand Down
24 changes: 2 additions & 22 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ type ManagerImpl struct {
// init containers.
devicesToReuse PodReusableDevices

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod

// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap
Expand Down Expand Up @@ -364,10 +361,6 @@ func (m *ManagerImpl) Stop() error {
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string])
}
Expand Down Expand Up @@ -548,20 +541,14 @@ func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
if !m.sourcesReady.AllReady() {
return
}

m.mutex.Lock()
defer m.mutex.Unlock()

activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

podsToBeRemoved := m.podDevices.pods()
for _, pod := range activeAndAdmittedPods {
for _, pod := range activePods {
podsToBeRemoved.Delete(string(pod.UID))
}
if len(podsToBeRemoved) <= 0 {
Expand Down Expand Up @@ -1171,13 +1158,6 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return len(checkpoints) == 0
}

func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.pendingAdmissionPod = pod
}

func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions pkg/kubelet/cm/devicemanager/topology_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
// ensures the Device Manager is consulted when Topology Aware Hints for each
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down Expand Up @@ -87,10 +83,6 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down
31 changes: 3 additions & 28 deletions pkg/kubelet/cm/memorymanager/memory_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ type manager struct {

// allocatableMemory holds the allocatable memory for each NUMA node
allocatableMemory []state.Block

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -242,10 +239,6 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.

// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before allocation
m.removeStaleState()

Expand Down Expand Up @@ -284,10 +277,6 @@ func (m *manager) State() state.Reader {

// GetPodTopologyHints returns the topology hints for the topology manager
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -296,10 +285,6 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.

// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -322,15 +307,12 @@ func (m *manager) removeStaleState() {
m.Lock()
defer m.Unlock()

// Get the list of admitted and active pods.
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}
// Get the list of active pods.
activePods := m.activePods()

// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activeAndAdmittedPods {
for _, pod := range activePods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -464,10 +446,3 @@ func (m *manager) GetAllocatableMemory() []state.Block {
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
return m.state.GetMemoryBlocks(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}
Loading

0 comments on commit 091fcfa

Please sign in to comment.