Skip to content

Commit

Permalink
kubelet/cm: move CPU reading from cm to cm/cpumanager
Browse files Browse the repository at this point in the history
Authored-by: Francesco Romani <fromani@redhat.com>
Signed-off-by: Peter Hunt <pehunt@redhat.com>
  • Loading branch information
haircommander committed Oct 11, 2024
1 parent c51195d commit 77d03e4
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 52 deletions.
5 changes: 0 additions & 5 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
"k8s.io/utils/cpuset"
utilpath "k8s.io/utils/path"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -133,10 +132,6 @@ type containerManagerImpl struct {
topologyManager topologymanager.Manager
// Interface for Dynamic Resource Allocation management.
draManager dra.Manager
// The full set of CPUs on the node. This field is set lazily, and is used to make sure
// the `cpuset` cgroup hierarchy is created on cgroup v2 when cpumanager is using a
// None policy.
allCPUs cpuset.CPUSet
}

type features struct {
Expand Down
24 changes: 19 additions & 5 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Manager interface {
// GetCPUAffinity returns cpuset which includes cpus from shared pools
// as well as exclusively allocated cpus
GetCPUAffinity(podUID, containerName string) cpuset.CPUSet

// GetAllCPUs returns all the CPUs known by cpumanager, as reported by the
// hardware discovery. Maps to the CPU capacity.
GetAllCPUs() cpuset.CPUSet
}

type manager struct {
Expand Down Expand Up @@ -136,7 +140,11 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string

// allocatableCPUs is the set of online CPUs as reported by the system
// allCPUs is the set of online CPUs as reported by the system
allCPUs cpuset.CPUSet

// allocatableCPUs is the set of online CPUs as reported by the system,
// and available for allocation, minus the reserved set
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
Expand All @@ -156,6 +164,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
var policy Policy
var err error

topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}

switch policyName(cpuPolicyName) {

case PolicyNone:
Expand All @@ -165,10 +178,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
}

case PolicyStatic:
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}
klog.InfoS("Detected CPU topology", "topology", topo)

reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
Expand Down Expand Up @@ -205,6 +214,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
topology: topo,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
allCPUs: topo.CPUDetails.CPUs(),
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
Expand Down Expand Up @@ -339,6 +349,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
return m.allocatableCPUs.Clone()
}

func (m *manager) GetAllCPUs() cpuset.CPUSet {
return m.allCPUs.Clone()
}

type reconciledContainer struct {
podName string
containerName string
Expand Down
11 changes: 2 additions & 9 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,15 +693,8 @@ func TestCPUManagerGenerate(t *testing.T) {
if rawMgr.policy.Name() != testCase.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy)
}
if rawMgr.policy.Name() == string(PolicyNone) {
if rawMgr.topology != nil {
t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology)
}
}
if rawMgr.policy.Name() != string(PolicyNone) {
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
}
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
return cpuset.CPUSet{}
}

func (m *fakeManager) GetAllCPUs() cpuset.CPUSet {
klog.InfoS("GetAllCPUs")
return cpuset.CPUSet{}
}

// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{
Expand Down
55 changes: 23 additions & 32 deletions pkg/kubelet/cm/node_container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/cpuset"
)

const (
Expand Down Expand Up @@ -187,6 +184,29 @@ func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.Res

// getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface.
func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
rc := getCgroupConfigInternal(rl, compressibleResourcesOnly)
if rc == nil {
return nil
}

// In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup.
// By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request.
// However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd
// doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected).
// An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer,
// and this is sufficient.
// Only do so on None policy, as Static policy will do its own updating of the cpuset.
// Please see the comment on policy none's GetAllocatableCPUs
if cm.cpuManager.GetAllocatableCPUs().IsEmpty() {
rc.CPUSet = cm.cpuManager.GetAllCPUs()
}

return rc
}

// getCgroupConfigInternal are the pieces of getCgroupConfig that don't require the cm object.
// This is added to unit test without needing to create a full containerManager
func getCgroupConfigInternal(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
// TODO(vishh): Set CPU Quota if necessary.
if rl == nil {
return nil
Expand Down Expand Up @@ -219,38 +239,9 @@ func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList, compressible
}
rc.HugePageLimit = HugePageLimits(rl)
}

// In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup.
// By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request.
// However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd
// doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected).
// An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer,
// and this is sufficient.
// Only do so on None policy, as Static policy will do its own updating of the cpuset.
if cm.NodeConfig.CPUManagerPolicy == string(cpumanager.PolicyNone) {
if cm.allCPUs.IsEmpty() {
cm.allCPUs = cm.getAllCPUs()
}
rc.CPUSet = cm.allCPUs
}

return &rc
}

func (cm *containerManagerImpl) getAllCPUs() cpuset.CPUSet {
machineInfo, err := cm.cadvisorInterface.MachineInfo()
if err != nil {
klog.V(4).InfoS("Failed to get machine info to get default cpuset", "error", err)
return cpuset.CPUSet{}
}
topo, err := topology.Discover(machineInfo)
if err != nil {
klog.V(4).InfoS("Failed to get topology info to get default cpuset", "error", err)
return cpuset.CPUSet{}
}
return topo.CPUDetails.CPUs()
}

// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
// Note that not all resources that are available on the node are included in the returned list of resources.
// Returns a ResourceList.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/node_container_manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func TestGetCgroupConfig(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
actual := getCgroupConfig(tc.resourceList, tc.compressibleResources)
actual := getCgroupConfigInternal(tc.resourceList, tc.compressibleResources)
tc.checks(actual, t)
})
}
Expand Down

0 comments on commit 77d03e4

Please sign in to comment.