Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Honor pod priorities when starting pods #125918

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ const (
// sandbox creation and network configuration completes successfully
PodReadyToStartContainersCondition featuregate.Feature = "PodReadyToStartContainersCondition"

// owner: @Huutomerkki
//
// Enables pods starting in the order of their priorities
PodStartingOrderByPriority featuregate.Feature = "PodStartingOrderByPriority"

// owner: @AxeZhan
// kep: http://kep.k8s.io/3960
//
Expand Down
4 changes: 4 additions & 0 deletions pkg/features/versioned_kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32
},

PodStartingOrderByPriority: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},

PortForwardWebsockets: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},
Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2600,7 +2600,11 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
if utilfeature.DefaultFeatureGate.Enabled(features.PodStartingOrderByPriority) {
sort.Sort(sliceutils.PodsByPriority(pods))
} else {
sort.Sort(sliceutils.PodsByCreationTime(pods))
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock()
Expand Down
14 changes: 11 additions & 3 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,14 +807,21 @@ func TestHandlePortConflicts(t *testing.T) {
spec := v1.PodSpec{NodeName: string(kl.nodeName), Containers: []v1.Container{{Ports: []v1.ContainerPort{{HostPort: 80}}}}}
pods := []*v1.Pod{
podWithUIDNameNsSpec("123456789", "newpod", "foo", spec),
podWithUIDNameNsSpec("555555555", "nopriopod", "foo", spec),
podWithUIDNameNsSpec("987654321", "oldpod", "foo", spec),
}
// Add priorities to 2 pods
priority := int32(10)
pods[0].Spec.Priority = &priority
pods[2].Spec.Priority = &priority
// Make sure the Pods are in the reverse order of creation time.
pods[2].CreationTimestamp = metav1.NewTime(time.Now())
pods[1].CreationTimestamp = metav1.NewTime(time.Now())
pods[0].CreationTimestamp = metav1.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
notfittingPod := pods[0]
fittingPod := pods[1]
notfittingPod1 := pods[0]
notfittingPod2 := pods[1]
fittingPod := pods[2]
kl.podWorkers.(*fakePodWorkers).running = map[types.UID]bool{
pods[0].UID: true,
pods[1].UID: true,
Expand All @@ -823,7 +830,8 @@ func TestHandlePortConflicts(t *testing.T) {
kl.HandlePodAdditions(pods)

// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
checkPodStatus(t, kl, notfittingPod1, v1.PodFailed)
checkPodStatus(t, kl, notfittingPod2, v1.PodFailed)
checkPodStatus(t, kl, fittingPod, v1.PodPending)
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/kubelet/util/sliceutils/sliceutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,37 @@ func (a ByImageSize) Less(i, j int) bool {
}
func (a ByImageSize) Len() int { return len(a) }
func (a ByImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

huutomerkki marked this conversation as resolved.
Show resolved Hide resolved
// PodsByPriority makes an array of pods sortable by their priority
// in descending order, and then by their creation timestamps in
// ascending order
type PodsByPriority []*v1.Pod

func (s PodsByPriority) Len() int {
return len(s)
}

func (s PodsByPriority) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s PodsByPriority) Less(i, j int) bool {
iPrio := getPodPriority(s[i])
jPrio := getPodPriority(s[j])

if iPrio > jPrio {
return true
}
if iPrio == jPrio {
return s[i].CreationTimestamp.Before(&s[j].CreationTimestamp)
}
return false
}

func getPodPriority(pod *v1.Pod) int32 {
if pod == nil || pod.Spec.Priority == nil {
return 0
}

return *pod.Spec.Priority
}
116 changes: 116 additions & 0 deletions pkg/kubelet/util/sliceutils/sliceutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,119 @@ func TestByImageSizeLess(t *testing.T) {
}
}
}

func buildPodsByPriority() PodsByPriority {
helperPriorities := []int32{
int32(200),
int32(100),
}
return []*v1.Pod{
{
Spec: v1.PodSpec{
Priority: &helperPriorities[0],
},
ObjectMeta: metav1.ObjectMeta{
Name: "critical1",
Namespace: v1.NamespaceDefault,
CreationTimestamp: metav1.Time{
Time: time.Now(),
},
},
},
{
Spec: v1.PodSpec{
Priority: &helperPriorities[0],
},
ObjectMeta: metav1.ObjectMeta{
Name: "critical2",
Namespace: v1.NamespaceDefault,
CreationTimestamp: metav1.Time{
Time: time.Now().Add(time.Hour * 1),
},
},
},
{
Spec: v1.PodSpec{
Priority: &helperPriorities[1],
},
ObjectMeta: metav1.ObjectMeta{
Name: "lowpriority",
Namespace: v1.NamespaceDefault,
CreationTimestamp: metav1.Time{
Time: time.Now(),
},
},
},
{
Spec: v1.PodSpec{
Priority: nil,
},
ObjectMeta: metav1.ObjectMeta{
Name: "nopriority",
Namespace: v1.NamespaceDefault,
CreationTimestamp: metav1.Time{
Time: time.Now(),
},
},
},
}
}

func TestPodsByPriorityLen(t *testing.T) {
fooTests := []struct {
pods PodsByPriority
el int
}{
{[]*v1.Pod{}, 0},
{buildPodsByPriority(), 4},
{[]*v1.Pod{nil}, 1},
{nil, 0},
}

for _, fooTest := range fooTests {
r := fooTest.pods.Len()
if r != fooTest.el {
t.Errorf("returned %d but expected %d for the len of ByImageSize=%v", r, fooTest.el, fooTest.pods)
}
}
}

func TestPodsByPrioritySwap(t *testing.T) {
fooTests := []struct {
pods PodsByPriority
i int
j int
}{
{buildPodsByPriority(), 0, 1},
{buildPodsByPriority(), 0, 2},
}
for _, fooTest := range fooTests {
fooi := fooTest.pods[fooTest.i]
fooj := fooTest.pods[fooTest.j]
fooTest.pods.Swap(fooTest.i, fooTest.j)
if fooi.GetName() != fooTest.pods[fooTest.j].GetName() || fooj.GetName() != fooTest.pods[fooTest.i].GetName() {
t.Errorf("failed to swap for %v", fooTest)
}
}
}

func TestPodsByPriorityLess(t *testing.T) {
fooTests := []struct {
pods PodsByPriority
i int
j int
er bool
}{
{buildPodsByPriority(), 0, 2, true},
{buildPodsByPriority(), 2, 1, false},
{buildPodsByPriority(), 0, 1, true},
{buildPodsByPriority(), 3, 2, false},
{buildPodsByPriority(), 0, 3, true},
}
for _, fooTest := range fooTests {
result := PodsByPriority.Less(fooTest.pods, fooTest.i, fooTest.j)
if result != fooTest.er {
t.Errorf("returned %t but expected %t for the foo=%v", result, fooTest.er, fooTest.pods)
}
}
}
157 changes: 157 additions & 0 deletions test/e2e_node/pod_restart_order_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2enode

import (
"context"
"fmt"
"os/exec"
"sort"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
)

var _ = SIGDescribe(framework.WithSerial(), "Pod Restart Order [testFocusHere2]", func() {
f := framework.NewDefaultFramework("pod-restart-order-serial")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged

f.Context("restart and get the target node order [testFocusHere2]", func() {

const (
podAmount = int32(5)
podRetryTimeout = 5 * time.Minute
)

tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *config.KubeletConfiguration) {
initialConfig.FeatureGates = map[string]bool{
string(features.PodStartingOrderByPriority): true,
}
})

var podCli *e2epod.PodClient

ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("Wait for the node to be ready")
waitForNodeReady(ctx)
ginkgo.By("Create priorityclasses")
for i := range podAmount {
f.ClientSet.SchedulingV1().PriorityClasses().Create(ctx, getPriorityClassDef(i), metav1.CreateOptions{})
}
podCli = e2epod.NewPodClient(f)
})

ginkgo.It("Should [testFocusHere2]", func(ctx context.Context) {
nodeName := getNodeName(ctx, f)
pods := []*v1.Pod{}
for i := range podAmount {
pods = append(pods, getPodWithPriority(fmt.Sprintf("%d", i), nodeName, fmt.Sprintf("%d", i)))
}
podCli.CreateBatch(ctx, pods)

podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "should be able to list pods")
gomega.Expect(podList.Items).To(gomega.HaveLen(int(podAmount)))
orderedPodList := podList.Items
sort.Slice(orderedPodList, func(a, b int) bool {
return orderedPodList[a].CreationTimestamp.Before(&orderedPodList[b].CreationTimestamp)
})
gomega.Expect(orderedPodList).To(gomega.Equal(podList.Items))

restartKubelet(ctx, true)

waitForKubeletToStart(ctx, f)

waitForNodeReady(ctx)

exec.Command(sleepCommand(120))

postRestartPods := waitForPodsCondition(ctx, f, int(podAmount), podRetryTimeout, testutils.PodRunningReadyOrSucceeded)

gomega.Expect(postRestartPods).To(gomega.HaveLen(int(podAmount)))

gomega.Expect(getRestartCount(*postRestartPods[0])).ToNot(gomega.BeZero()) //Fails, pods have not been restarted

restartedOrderedPods := postRestartPods
sort.Slice(restartedOrderedPods, func(a, b int) bool {
return restartedOrderedPods[a].CreationTimestamp.Before(&restartedOrderedPods[b].CreationTimestamp)
})
gomega.Expect(restartedOrderedPods).To(gomega.Equal(postRestartPods))
})
})
})

// getRestartCount return the restart count of given pod (total number of its containers restarts).
// Copied over from dashboard
func getRestartCount(pod v1.Pod) int32 {
var restartCount int32 = 0
for _, containerStatus := range pod.Status.ContainerStatuses {
restartCount += containerStatus.RestartCount
}
return restartCount
}

func getPriorityClassDef(priority int32) *schedulingv1.PriorityClass {
prio := &schedulingv1.PriorityClass{
TypeMeta: metav1.TypeMeta{
Kind: "PriorityClass",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", priority),
},
Value: priority,
GlobalDefault: false,
Description: "Test priority",
}
return prio
}

func getPodWithPriority(name string, node string, priority string) *v1.Pod {
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: name,
Image: imageutils.GetPauseImageName(),
},
},
PriorityClassName: priority,
NodeName: node,
RestartPolicy: "Always",
},
}
return pod
}