Skip to content

Commit

Permalink
Merge pull request kubernetes#125923 from haircommander/cpuset-fix-re…
Browse files Browse the repository at this point in the history
…start

kubelet/cm: fix bug where kubelet restarts from missing cpuset cgroup
  • Loading branch information
k8s-ci-robot authored Oct 11, 2024
2 parents c45f3ab + b94c538 commit 762a85e
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 28 deletions.
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/cgroup_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func (m *cgroupCommon) toResources(resourceConfig *ResourceConfig) *libcontainer
if resourceConfig.PidsLimit != nil {
resources.PidsLimit = *resourceConfig.PidsLimit
}
if !resourceConfig.CPUSet.IsEmpty() {
resources.CpusetCpus = resourceConfig.CPUSet.String()
}

m.maybeSetHugetlb(resourceConfig, resources)

Expand Down
24 changes: 19 additions & 5 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Manager interface {
// GetCPUAffinity returns cpuset which includes cpus from shared pools
// as well as exclusively allocated cpus
GetCPUAffinity(podUID, containerName string) cpuset.CPUSet

// GetAllCPUs returns all the CPUs known by cpumanager, as reported by the
// hardware discovery. Maps to the CPU capacity.
GetAllCPUs() cpuset.CPUSet
}

type manager struct {
Expand Down Expand Up @@ -136,7 +140,11 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string

// allocatableCPUs is the set of online CPUs as reported by the system
// allCPUs is the set of online CPUs as reported by the system
allCPUs cpuset.CPUSet

// allocatableCPUs is the set of online CPUs as reported by the system,
// and available for allocation, minus the reserved set
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
Expand All @@ -156,6 +164,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
var policy Policy
var err error

topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}

switch policyName(cpuPolicyName) {

case PolicyNone:
Expand All @@ -165,10 +178,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
}

case PolicyStatic:
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}
klog.InfoS("Detected CPU topology", "topology", topo)

reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
Expand Down Expand Up @@ -205,6 +214,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
topology: topo,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
allCPUs: topo.CPUDetails.CPUs(),
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
Expand Down Expand Up @@ -339,6 +349,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
return m.allocatableCPUs.Clone()
}

func (m *manager) GetAllCPUs() cpuset.CPUSet {
return m.allCPUs.Clone()
}

type reconciledContainer struct {
podName string
containerName string
Expand Down
11 changes: 2 additions & 9 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,15 +693,8 @@ func TestCPUManagerGenerate(t *testing.T) {
if rawMgr.policy.Name() != testCase.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy)
}
if rawMgr.policy.Name() == string(PolicyNone) {
if rawMgr.topology != nil {
t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology)
}
}
if rawMgr.policy.Name() != string(PolicyNone) {
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
}
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
return cpuset.CPUSet{}
}

func (m *fakeManager) GetAllCPUs() cpuset.CPUSet {
klog.InfoS("GetAllCPUs")
return cpuset.CPUSet{}
}

// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{
Expand Down
48 changes: 35 additions & 13 deletions pkg/kubelet/cm/node_container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
// The default limits for cpu shares can be very low which can lead to CPU starvation for pods.
ResourceParameters: getCgroupConfig(nodeAllocatable, false),
ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false),
}
if cm.cgroupManager.Exists(cgroupConfig.Name) {
return nil
Expand Down Expand Up @@ -81,7 +81,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {

cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
ResourceParameters: getCgroupConfig(nodeAllocatable, false),
ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false),
}

// Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail.
Expand Down Expand Up @@ -110,7 +110,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
// Now apply kube reserved and system reserved limits if required.
if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) {
klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, false); err != nil {
if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, false); err != nil {
message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return errors.New(message)
Expand All @@ -119,7 +119,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
}
if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) {
klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, false); err != nil {
if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, false); err != nil {
message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return errors.New(message)
Expand All @@ -129,7 +129,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {

if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedCompressibleEnforcementKey) {
klog.V(2).InfoS("Enforcing system reserved compressible on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, true); err != nil {
if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, true); err != nil {
message := fmt.Sprintf("Failed to enforce System Reserved Compressible Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return errors.New(message)
Expand All @@ -139,7 +139,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {

if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedCompressibleEnforcementKey) {
klog.V(2).InfoS("Enforcing kube reserved compressible on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, true); err != nil {
if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, true); err != nil {
message := fmt.Sprintf("Failed to enforce Kube Reserved Compressible Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return errors.New(message)
Expand All @@ -150,9 +150,9 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
}

// enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface.
func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList, compressibleResources bool) error {
rp := getCgroupConfig(rl, compressibleResources)

func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.ResourceList, compressibleResources bool) error {
cName := cm.cgroupManager.CgroupName(cNameStr)
rp := cm.getCgroupConfig(rl, compressibleResources)
if rp == nil {
return fmt.Errorf("%q cgroup is not configured properly", cName)
}
Expand All @@ -173,17 +173,40 @@ func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.
ResourceParameters: rp,
}
klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit)
if err := cgroupManager.Validate(cgroupConfig.Name); err != nil {
if err := cm.cgroupManager.Validate(cgroupConfig.Name); err != nil {
return err
}
if err := cgroupManager.Update(cgroupConfig); err != nil {
if err := cm.cgroupManager.Update(cgroupConfig); err != nil {
return err
}
return nil
}

// getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface.
func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
rc := getCgroupConfigInternal(rl, compressibleResourcesOnly)
if rc == nil {
return nil
}

// In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup.
// By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request.
// However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd
// doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected).
// An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer,
// and this is sufficient.
// Only do so on None policy, as Static policy will do its own updating of the cpuset.
// Please see the comment on policy none's GetAllocatableCPUs
if cm.cpuManager.GetAllocatableCPUs().IsEmpty() {
rc.CPUSet = cm.cpuManager.GetAllCPUs()
}

return rc
}

// getCgroupConfigInternal are the pieces of getCgroupConfig that don't require the cm object.
// This is added to unit test without needing to create a full containerManager
func getCgroupConfigInternal(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig {
// TODO(vishh): Set CPU Quota if necessary.
if rl == nil {
return nil
Expand Down Expand Up @@ -216,7 +239,6 @@ func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *Resour
}
rc.HugePageLimit = HugePageLimits(rl)
}

return &rc
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/node_container_manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func TestGetCgroupConfig(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
actual := getCgroupConfig(tc.resourceList, tc.compressibleResources)
actual := getCgroupConfigInternal(tc.resourceList, tc.compressibleResources)
tc.checks(actual, t)
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package cm
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/cpuset"
)

// ResourceConfig holds information about all the supported cgroup resource parameters.
type ResourceConfig struct {
// Memory limit (in bytes).
Memory *int64
// CPU set (number of CPUs the cgroup has access to).
CPUSet cpuset.CPUSet
// CPU shares (relative weight vs. other containers).
CPUShares *uint64
// CPU hardcap limit (in usecs). Allowed cpu time in a given period.
Expand Down
58 changes: 58 additions & 0 deletions test/e2e_node/node_container_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,64 @@ var _ = SIGDescribe("Node Container Manager", framework.WithSerial(), func() {
framework.ExpectNoError(runTest(ctx, f))
})
})
f.Describe("Validate CGroup management", func() {
// Regression test for https://issues.k8s.io/125923
// In this issue there's a race involved with systemd which seems to manifest most likely, or perhaps only
// (data gathered so far seems inconclusive) on the very first boot of the machine, so restarting the kubelet
// seems not sufficient. OTOH, the exact reproducer seems to require a dedicate lane with only this test, or
// to reboot the machine before to run this test. Both are practically unrealistic in CI.
// The closest approximation is this test in this current form, using a kubelet restart. This at least
// acts as non regression testing, so it still brings value.
ginkgo.It("should correctly start with cpumanager none policy in use with systemd", func(ctx context.Context) {
if !IsCgroup2UnifiedMode() {
ginkgo.Skip("this test requires cgroups v2")
}

var err error
var oldCfg *kubeletconfig.KubeletConfiguration
// Get current kubelet configuration
oldCfg, err = getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)

ginkgo.DeferCleanup(func(ctx context.Context) {
if oldCfg != nil {
// Update the Kubelet configuration.
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg))

ginkgo.By("Restarting the kubelet")
restartKubelet(true)

// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func(ctx context.Context) bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
ginkgo.By("Started the kubelet")
}
})

newCfg := oldCfg.DeepCopy()
// Change existing kubelet configuration
newCfg.CPUManagerPolicy = "none"
newCfg.CgroupDriver = "systemd"
newCfg.FailCgroupV1 = true // extra safety. We want to avoid false negatives though, so we added the skip check earlier

// Update the Kubelet configuration.
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg))

ginkgo.By("Restarting the kubelet")
restartKubelet(true)

// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func(ctx context.Context) bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
ginkgo.By("Started the kubelet")

gomega.Consistently(ctx, func(ctx context.Context) bool {
return getNodeReadyStatus(ctx, f) && kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeTrueBecause("node keeps reporting ready status"))
})
})
})

func expectFileValToEqual(filePath string, expectedValue, delta int64) error {
Expand Down

0 comments on commit 762a85e

Please sign in to comment.