Skip to content

Commit

Permalink
Memory manager support for Windows nodes (#128560)
Browse files Browse the repository at this point in the history
  • Loading branch information
marosset authored Nov 7, 2024
1 parent 8504758 commit 3c9380c
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 22 deletions.
32 changes: 27 additions & 5 deletions pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type containerManagerImpl struct {
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
nodeInfo *v1.Node
sync.RWMutex
}
Expand Down Expand Up @@ -95,12 +96,17 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,

containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}

// Initialize memory manager
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
}
}

// Starts device manager.
Expand Down Expand Up @@ -128,6 +134,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cadvisorInterface: cadvisorInterface,
}

cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
cm.memoryManager = memorymanager.NewFakeManager()

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.InfoS("Creating topology manager")
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
Expand Down Expand Up @@ -155,9 +165,21 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()

klog.InfoS("Creating memory manager")
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
}

klog.InfoS("Creating device plugin manager")
Expand Down Expand Up @@ -273,7 +295,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
}

func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
}

func (cm *containerManagerImpl) GetPodCgroupRoot() string {
Expand Down
120 changes: 106 additions & 14 deletions pkg/kubelet/cm/internal_container_lifecycle_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,122 @@ limitations under the License.
package cm

import (
"k8s.io/api/core/v1"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List())
for _, affinity := range affinities {
klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors())
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
return nil
}

klog.V(4).Info("PreCreateContainer for Windows")

// retrieve CPU and NUMA affinity from CPU Manager and Memory Manager (if enabled)
var allocatedCPUs cpuset.CPUSet
if i.cpuManager != nil {
allocatedCPUs = i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
}

var numaNodes sets.Set[int]
if i.memoryManager != nil {
numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container)
}

containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities
// Gather all CPUs associated with the selected NUMA nodes
var allNumaNodeCPUs []winstats.GroupAffinity
for _, numaNode := range sets.List(numaNodes) {
affinity, err := winstats.GetCPUsforNUMANode(uint16(numaNode))
if err != nil {
return fmt.Errorf("failed to get CPUs for NUMA node %d: %v", numaNode, err)
}
allNumaNodeCPUs = append(allNumaNodeCPUs, *affinity)
}

var finalCPUSet = computeFinalCpuSet(allocatedCPUs, allNumaNodeCPUs)

klog.V(4).InfoS("Setting CPU affinity", "affinity", finalCPUSet, "container", container.Name, "pod", pod.UID)

// Set CPU group affinities in the container config
if finalCPUSet != nil {
var cpusToGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for group, mask := range groupMasks(finalCPUSet) {

cpusToGroupAffinities = append(cpusToGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(group),
CpuMask: uint64(mask),
})
}
containerConfig.Windows.Resources.AffinityCpus = cpusToGroupAffinities
}

// return nil if no CPUs were selected
return nil
}

// computeFinalCpuSet determines the final set of CPUs to use based on the CPU and memory managers
// and is extracted so that it can be tested
func computeFinalCpuSet(allocatedCPUs cpuset.CPUSet, allNumaNodeCPUs []winstats.GroupAffinity) sets.Set[int] {
if !allocatedCPUs.IsEmpty() && len(allNumaNodeCPUs) > 0 {
// Both CPU and memory managers are enabled

numaNodeAffinityCPUSet := computeCPUSet(allNumaNodeCPUs)
cpuManagerAffinityCPUSet := sets.New[int](allocatedCPUs.List()...)

// Determine which set of CPUs to use using the following logic outlined in the KEP:
// Case 1: CPU manager selects more CPUs than those available in the NUMA nodes selected by the memory manager
// Case 2: CPU manager selects fewer CPUs, and they all fall within the CPUs available in the NUMA nodes selected by the memory manager
// Case 3: CPU manager selects fewer CPUs, but some are outside of the CPUs available in the NUMA nodes selected by the memory manager

if cpuManagerAffinityCPUSet.Len() > numaNodeAffinityCPUSet.Len() {
// Case 1, use CPU manager selected CPUs
return cpuManagerAffinityCPUSet
} else if numaNodeAffinityCPUSet.IsSuperset(cpuManagerAffinityCPUSet) {
// case 2, use CPU manager selected CPUstry
return cpuManagerAffinityCPUSet
} else {
// Case 3, merge CPU manager and memory manager selected CPUs
return cpuManagerAffinityCPUSet.Union(numaNodeAffinityCPUSet)
}
} else if !allocatedCPUs.IsEmpty() {
// Only CPU manager is enabled, use CPU manager selected CPUs
return sets.New[int](allocatedCPUs.List()...)
} else if len(allNumaNodeCPUs) > 0 {
// Only memory manager is enabled, use CPUs associated with selected NUMA nodes
return computeCPUSet(allNumaNodeCPUs)
}
return nil
}

// computeCPUSet converts a list of GroupAffinity to a set of CPU IDs
func computeCPUSet(affinities []winstats.GroupAffinity) sets.Set[int] {
cpuSet := sets.New[int]()
for _, affinity := range affinities {
for i := 0; i < 64; i++ {
if (affinity.Mask>>i)&1 == 1 {
cpuID := int(affinity.Group)*64 + i
cpuSet.Insert(cpuID)
}
}
}
return cpuSet
}

// groupMasks converts a set of CPU IDs into group and mask representations
func groupMasks(cpuSet sets.Set[int]) map[int]uint64 {
groupMasks := make(map[int]uint64)
for cpu := range cpuSet {
group := cpu / 64
mask := uint64(1) << (cpu % 64)
groupMasks[group] |= mask
}
return groupMasks
}
160 changes: 160 additions & 0 deletions pkg/kubelet/cm/internal_container_lifecycle_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
//go:build windows
// +build windows

/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cm

import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)

func TestComputeCPUSet(t *testing.T) {
affinities := []winstats.GroupAffinity{
{Mask: 0b1010, Group: 0}, // CPUs 1 and 3 in Group 0
{Mask: 0b1001, Group: 1}, // CPUs 0 and 3 in Group 1
}

expected := map[int]struct{}{
1: {}, // Group 0, CPU 1
3: {}, // Group 0, CPU 3
64: {}, // Group 1, CPU 0
67: {}, // Group 1, CPU 3
}

result := computeCPUSet(affinities)
if len(result) != len(expected) {
t.Errorf("expected length %v, but got length %v", len(expected), len(result))
}
for key := range expected {
if _, exists := result[key]; !exists {
t.Errorf("expected key %v to be in result", key)
}
}
}

func TestGroupMasks(t *testing.T) {
tests := []struct {
cpuSet sets.Set[int]
expected map[int]uint64
}{
{
cpuSet: sets.New[int](0, 1, 2, 3, 64, 65, 66, 67),
expected: map[int]uint64{
0: 0b1111,
1: 0b1111,
},
},
{
cpuSet: sets.New[int](0, 2, 64, 66),
expected: map[int]uint64{
0: 0b0101,
1: 0b0101,
},
},
{
cpuSet: sets.New[int](1, 65),
expected: map[int]uint64{
0: 0b0010,
1: 0b0010,
},
},
{
cpuSet: sets.New[int](),
expected: map[int]uint64{},
},
}

for _, test := range tests {
result := groupMasks(test.cpuSet)
if len(result) != len(test.expected) {
t.Errorf("expected length %v, but got length %v", len(test.expected), len(result))
}
for group, mask := range test.expected {
if result[group] != mask {
t.Errorf("expected group %v to have mask %v, but got mask %v", group, mask, result[group])
}
}
}
}

func TestComputeFinalCpuSet(t *testing.T) {
tests := []struct {
name string
allocatedCPUs cpuset.CPUSet
allNumaNodeCPUs []winstats.GroupAffinity
expectedCPUSet sets.Set[int]
}{
{
name: "Both managers enabled, CPU manager selects more CPUs",
allocatedCPUs: cpuset.New(0, 1, 2, 3),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b0011, Group: 0}, // CPUs 0 and 1 in Group 0
},
expectedCPUSet: sets.New[int](0, 1, 2, 3),
},
{
name: "Both managers enabled, CPU manager selects fewer CPUs within NUMA nodes",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1111, Group: 0}, // CPUs 0, 1, 2, 3 in Group 0
},
expectedCPUSet: sets.New[int](0, 1),
},
{
name: "Both managers enabled, CPU manager selects fewer CPUs outside NUMA nodes",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
},
expectedCPUSet: sets.New[int](0, 1, 2, 3),
},
{
name: "Only CPU manager enabled",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: nil,
expectedCPUSet: sets.New[int](0, 1),
},
{
name: "Only memory manager enabled",
allocatedCPUs: cpuset.New(),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
},
expectedCPUSet: sets.New[int](2, 3),
},
{
name: "Neither manager enabled",
allocatedCPUs: cpuset.New(),
allNumaNodeCPUs: nil,
expectedCPUSet: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := computeFinalCpuSet(test.allocatedCPUs, test.allNumaNodeCPUs)
if !result.Equal(test.expectedCPUSet) {
t.Errorf("expected %v, but got %v", test.expectedCPUSet, result)
}
})
}
}
Loading

0 comments on commit 3c9380c

Please sign in to comment.