diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 3c36daeffb546..352660dd653b9 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -68,6 +68,7 @@ type containerManagerImpl struct { // Interface for Topology resource co-ordination topologyManager topologymanager.Manager cpuManager cpumanager.Manager + memoryManager memorymanager.Manager nodeInfo *v1.Node sync.RWMutex } @@ -95,12 +96,17 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) - // Initialize CPU manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) if err != nil { return fmt.Errorf("start cpu manager error: %v", err) } + + // Initialize memory manager + err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) + if err != nil { + return fmt.Errorf("start memory manager error: %v", err) + } } // Starts device manager. @@ -128,6 +134,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cadvisorInterface: cadvisorInterface, } + cm.topologyManager = topologymanager.NewFakeManager() + cm.cpuManager = cpumanager.NewFakeManager() + cm.memoryManager = memorymanager.NewFakeManager() + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { klog.InfoS("Creating topology manager") cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology, @@ -155,9 +165,21 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, err } cm.topologyManager.AddHintProvider(cm.cpuManager) - } else { - cm.topologyManager = topologymanager.NewFakeManager() - cm.cpuManager = cpumanager.NewFakeManager() + + klog.InfoS("Creating memory manager") + cm.memoryManager, err = memorymanager.NewManager( + nodeConfig.ExperimentalMemoryManagerPolicy, + machineInfo, + cm.GetNodeAllocatableReservation(), + nodeConfig.ExperimentalMemoryManagerReservedMemory, + nodeConfig.KubeletRootDir, + cm.topologyManager, + ) + if err != nil { + klog.ErrorS(err, "Failed to initialize memory manager") + return nil, err + } + cm.topologyManager.AddHintProvider(cm.memoryManager) } klog.InfoS("Creating device plugin manager") @@ -273,7 +295,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager} + return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager} } func (cm *containerManagerImpl) GetPodCgroupRoot() string { diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows.go b/pkg/kubelet/cm/internal_container_lifecycle_windows.go index 96159e99a8ed4..939658d846fee 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle_windows.go +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows.go @@ -20,30 +20,122 @@ limitations under the License. package cm import ( - "k8s.io/api/core/v1" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/winstats" + "k8s.io/utils/cpuset" ) func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { - if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { - allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name) - if !allocatedCPUs.IsEmpty() { - var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity - affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List()) - for _, affinity := range affinities { - klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors()) - cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{ - CpuGroup: uint32(affinity.Group), - CpuMask: uint64(affinity.Mask), - }) - } + if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { + return nil + } + + klog.V(4).Info("PreCreateContainer for Windows") + + // retrieve CPU and NUMA affinity from CPU Manager and Memory Manager (if enabled) + var allocatedCPUs cpuset.CPUSet + if i.cpuManager != nil { + allocatedCPUs = i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name) + } + + var numaNodes sets.Set[int] + if i.memoryManager != nil { + numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container) + } - containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities + // Gather all CPUs associated with the selected NUMA nodes + var allNumaNodeCPUs []winstats.GroupAffinity + for _, numaNode := range sets.List(numaNodes) { + affinity, err := winstats.GetCPUsforNUMANode(uint16(numaNode)) + if err != nil { + return fmt.Errorf("failed to get CPUs for NUMA node %d: %v", numaNode, err) } + allNumaNodeCPUs = append(allNumaNodeCPUs, *affinity) } + + var finalCPUSet = computeFinalCpuSet(allocatedCPUs, allNumaNodeCPUs) + + klog.V(4).InfoS("Setting CPU affinity", "affinity", finalCPUSet, "container", container.Name, "pod", pod.UID) + + // Set CPU group affinities in the container config + if finalCPUSet != nil { + var cpusToGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity + for group, mask := range groupMasks(finalCPUSet) { + + cpusToGroupAffinities = append(cpusToGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{ + CpuGroup: uint32(group), + CpuMask: uint64(mask), + }) + } + containerConfig.Windows.Resources.AffinityCpus = cpusToGroupAffinities + } + + // return nil if no CPUs were selected return nil } + +// computeFinalCpuSet determines the final set of CPUs to use based on the CPU and memory managers +// and is extracted so that it can be tested +func computeFinalCpuSet(allocatedCPUs cpuset.CPUSet, allNumaNodeCPUs []winstats.GroupAffinity) sets.Set[int] { + if !allocatedCPUs.IsEmpty() && len(allNumaNodeCPUs) > 0 { + // Both CPU and memory managers are enabled + + numaNodeAffinityCPUSet := computeCPUSet(allNumaNodeCPUs) + cpuManagerAffinityCPUSet := sets.New[int](allocatedCPUs.List()...) + + // Determine which set of CPUs to use using the following logic outlined in the KEP: + // Case 1: CPU manager selects more CPUs than those available in the NUMA nodes selected by the memory manager + // Case 2: CPU manager selects fewer CPUs, and they all fall within the CPUs available in the NUMA nodes selected by the memory manager + // Case 3: CPU manager selects fewer CPUs, but some are outside of the CPUs available in the NUMA nodes selected by the memory manager + + if cpuManagerAffinityCPUSet.Len() > numaNodeAffinityCPUSet.Len() { + // Case 1, use CPU manager selected CPUs + return cpuManagerAffinityCPUSet + } else if numaNodeAffinityCPUSet.IsSuperset(cpuManagerAffinityCPUSet) { + // case 2, use CPU manager selected CPUstry + return cpuManagerAffinityCPUSet + } else { + // Case 3, merge CPU manager and memory manager selected CPUs + return cpuManagerAffinityCPUSet.Union(numaNodeAffinityCPUSet) + } + } else if !allocatedCPUs.IsEmpty() { + // Only CPU manager is enabled, use CPU manager selected CPUs + return sets.New[int](allocatedCPUs.List()...) + } else if len(allNumaNodeCPUs) > 0 { + // Only memory manager is enabled, use CPUs associated with selected NUMA nodes + return computeCPUSet(allNumaNodeCPUs) + } + return nil +} + +// computeCPUSet converts a list of GroupAffinity to a set of CPU IDs +func computeCPUSet(affinities []winstats.GroupAffinity) sets.Set[int] { + cpuSet := sets.New[int]() + for _, affinity := range affinities { + for i := 0; i < 64; i++ { + if (affinity.Mask>>i)&1 == 1 { + cpuID := int(affinity.Group)*64 + i + cpuSet.Insert(cpuID) + } + } + } + return cpuSet +} + +// groupMasks converts a set of CPU IDs into group and mask representations +func groupMasks(cpuSet sets.Set[int]) map[int]uint64 { + groupMasks := make(map[int]uint64) + for cpu := range cpuSet { + group := cpu / 64 + mask := uint64(1) << (cpu % 64) + groupMasks[group] |= mask + } + return groupMasks +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows_test.go b/pkg/kubelet/cm/internal_container_lifecycle_windows_test.go new file mode 100644 index 0000000000000..747250ead3df2 --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows_test.go @@ -0,0 +1,160 @@ +//go:build windows +// +build windows + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/winstats" + "k8s.io/utils/cpuset" +) + +func TestComputeCPUSet(t *testing.T) { + affinities := []winstats.GroupAffinity{ + {Mask: 0b1010, Group: 0}, // CPUs 1 and 3 in Group 0 + {Mask: 0b1001, Group: 1}, // CPUs 0 and 3 in Group 1 + } + + expected := map[int]struct{}{ + 1: {}, // Group 0, CPU 1 + 3: {}, // Group 0, CPU 3 + 64: {}, // Group 1, CPU 0 + 67: {}, // Group 1, CPU 3 + } + + result := computeCPUSet(affinities) + if len(result) != len(expected) { + t.Errorf("expected length %v, but got length %v", len(expected), len(result)) + } + for key := range expected { + if _, exists := result[key]; !exists { + t.Errorf("expected key %v to be in result", key) + } + } +} + +func TestGroupMasks(t *testing.T) { + tests := []struct { + cpuSet sets.Set[int] + expected map[int]uint64 + }{ + { + cpuSet: sets.New[int](0, 1, 2, 3, 64, 65, 66, 67), + expected: map[int]uint64{ + 0: 0b1111, + 1: 0b1111, + }, + }, + { + cpuSet: sets.New[int](0, 2, 64, 66), + expected: map[int]uint64{ + 0: 0b0101, + 1: 0b0101, + }, + }, + { + cpuSet: sets.New[int](1, 65), + expected: map[int]uint64{ + 0: 0b0010, + 1: 0b0010, + }, + }, + { + cpuSet: sets.New[int](), + expected: map[int]uint64{}, + }, + } + + for _, test := range tests { + result := groupMasks(test.cpuSet) + if len(result) != len(test.expected) { + t.Errorf("expected length %v, but got length %v", len(test.expected), len(result)) + } + for group, mask := range test.expected { + if result[group] != mask { + t.Errorf("expected group %v to have mask %v, but got mask %v", group, mask, result[group]) + } + } + } +} + +func TestComputeFinalCpuSet(t *testing.T) { + tests := []struct { + name string + allocatedCPUs cpuset.CPUSet + allNumaNodeCPUs []winstats.GroupAffinity + expectedCPUSet sets.Set[int] + }{ + { + name: "Both managers enabled, CPU manager selects more CPUs", + allocatedCPUs: cpuset.New(0, 1, 2, 3), + allNumaNodeCPUs: []winstats.GroupAffinity{ + {Mask: 0b0011, Group: 0}, // CPUs 0 and 1 in Group 0 + }, + expectedCPUSet: sets.New[int](0, 1, 2, 3), + }, + { + name: "Both managers enabled, CPU manager selects fewer CPUs within NUMA nodes", + allocatedCPUs: cpuset.New(0, 1), + allNumaNodeCPUs: []winstats.GroupAffinity{ + {Mask: 0b1111, Group: 0}, // CPUs 0, 1, 2, 3 in Group 0 + }, + expectedCPUSet: sets.New[int](0, 1), + }, + { + name: "Both managers enabled, CPU manager selects fewer CPUs outside NUMA nodes", + allocatedCPUs: cpuset.New(0, 1), + allNumaNodeCPUs: []winstats.GroupAffinity{ + {Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0 + }, + expectedCPUSet: sets.New[int](0, 1, 2, 3), + }, + { + name: "Only CPU manager enabled", + allocatedCPUs: cpuset.New(0, 1), + allNumaNodeCPUs: nil, + expectedCPUSet: sets.New[int](0, 1), + }, + { + name: "Only memory manager enabled", + allocatedCPUs: cpuset.New(), + allNumaNodeCPUs: []winstats.GroupAffinity{ + {Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0 + }, + expectedCPUSet: sets.New[int](2, 3), + }, + { + name: "Neither manager enabled", + allocatedCPUs: cpuset.New(), + allNumaNodeCPUs: nil, + expectedCPUSet: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := computeFinalCpuSet(test.allocatedCPUs, test.allNumaNodeCPUs) + if !result.Equal(test.expectedCPUSet) { + t.Errorf("expected %v, but got %v", test.expectedCPUSet, result) + } + }) + } +} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 831afb193ea82..5b73230b38cb5 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -19,6 +19,7 @@ package memorymanager import ( "context" "fmt" + "runtime" "sync" cadvisorapi "github.com/google/cadvisor/info/v1" @@ -140,6 +141,10 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll policy = NewPolicyNone() case policyTypeStatic: + if runtime.GOOS == "windows" { + return nil, fmt.Errorf("policy %q is not available on Windows", policyTypeStatic) + } + systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory) if err != nil { return nil, err @@ -150,8 +155,22 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll return nil, err } + case policyTypeBestEffort: + if runtime.GOOS == "windows" { + systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory) + if err != nil { + return nil, err + } + policy, err = NewPolicyBestEffort(machineInfo, systemReserved, affinity) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("policy %q is not available for platform %q", policyTypeBestEffort, runtime.GOOS) + } + default: - return nil, fmt.Errorf("unknown policy: \"%s\"", policyName) + return nil, fmt.Errorf("unknown policy: %q", policyName) } manager := &manager{ diff --git a/pkg/kubelet/cm/memorymanager/policy_best_effort.go b/pkg/kubelet/cm/memorymanager/policy_best_effort.go new file mode 100644 index 0000000000000..2a2eabc82634e --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/policy_best_effort.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memorymanager + +import ( + cadvisorapi "github.com/google/cadvisor/info/v1" + + v1 "k8s.io/api/core/v1" + + "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" +) + +// On Windows we want to use the same logic as the StaticPolicy to compute the memory topology hints +// but unlike linux based systems, on Windows systems numa nodes cannot be directly assigned or guaranteed via Windows APIs +// (windows scheduler will use the numa node that is closest to the cpu assigned therefor respecting the numa node assignment as a best effort). Because of this we don't want to have users specify "StaticPolicy" for the memory manager +// policy via kubelet configuration. Instead we want to use the "BestEffort" policy which will use the same logic as the StaticPolicy +// and doing so will reduce code duplication. +const policyTypeBestEffort policyType = "BestEffort" + +// bestEffortPolicy is implementation of the policy interface for the BestEffort policy +type bestEffortPolicy struct { + static *staticPolicy +} + +var _ Policy = &bestEffortPolicy{} + +func NewPolicyBestEffort(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { + p, err := NewPolicyStatic(machineInfo, reserved, affinity) + + if err != nil { + return nil, err + } + + return &bestEffortPolicy{ + static: p.(*staticPolicy), + }, nil +} + +func (p *bestEffortPolicy) Name() string { + return string(policyTypeBestEffort) +} + +func (p *bestEffortPolicy) Start(s state.State) error { + return p.static.Start(s) +} + +func (p *bestEffortPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + return p.static.Allocate(s, pod, container) +} + +func (p *bestEffortPolicy) RemoveContainer(s state.State, podUID string, containerName string) { + p.static.RemoveContainer(s, podUID, containerName) +} + +func (p *bestEffortPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { + return p.static.GetPodTopologyHints(s, pod) +} + +func (p *bestEffortPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + return p.static.GetTopologyHints(s, pod, container) +} + +func (p *bestEffortPolicy) GetAllocatableMemory(s state.State) []state.Block { + return p.static.GetAllocatableMemory(s) +} diff --git a/pkg/kubelet/winstats/cpu_topology.go b/pkg/kubelet/winstats/cpu_topology.go index 0ea5af43ebed9..c6a7eab5c1ce9 100644 --- a/pkg/kubelet/winstats/cpu_topology.go +++ b/pkg/kubelet/winstats/cpu_topology.go @@ -21,15 +21,17 @@ package winstats import ( "fmt" - cadvisorapi "github.com/google/cadvisor/info/v1" - "k8s.io/klog/v2" "syscall" "unsafe" + + cadvisorapi "github.com/google/cadvisor/info/v1" + "k8s.io/klog/v2" ) var ( procGetLogicalProcessorInformationEx = modkernel32.NewProc("GetLogicalProcessorInformationEx") getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx") + procGetNumaNodeProcessorMaskEx = modkernel32.NewProc("GetNumaNodeProcessorMaskEx") ) type relationType int @@ -106,6 +108,21 @@ func CpusToGroupAffinity(cpus []int) map[int]*GroupAffinity { return groupAffinities } +// GetCPUsForNUMANode queries the system for the CPUs that are part of the given NUMA node. +func GetCPUsforNUMANode(nodeNumber uint16) (*GroupAffinity, error) { + var affinity GroupAffinity + + r1, _, err := procGetNumaNodeProcessorMaskEx.Call( + uintptr(nodeNumber), + uintptr(unsafe.Pointer(&affinity)), + ) + if r1 == 0 { + return nil, fmt.Errorf("Error getting CPU mask for NUMA node %d: %v", nodeNumber, err) + } + + return &affinity, nil +} + type numaNodeRelationship struct { NodeNumber uint32 Reserved [18]byte