From 03e3779d40c9b5b417275395ef1f12d044582acb Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Tue, 3 Sep 2024 16:25:28 +0900 Subject: [PATCH] feat: implement Pod smaller update events --- pkg/scheduler/framework/events.go | 27 +++++++++++++++++++ pkg/scheduler/framework/events_test.go | 18 +++++++++++++ .../schedulinggates/scheduling_gates.go | 9 +++---- .../schedulinggates/scheduling_gates_test.go | 10 ++----- .../tainttoleration/taint_toleration.go | 23 +++++++--------- .../tainttoleration/taint_toleration_test.go | 25 ++--------------- pkg/scheduler/framework/types.go | 6 ++++- pkg/scheduler/scheduler_test.go | 18 +++++++++++++ 8 files changed, 84 insertions(+), 52 deletions(-) diff --git a/pkg/scheduler/framework/events.go b/pkg/scheduler/framework/events.go index 89422a36dbd25..9284c6bb0229f 100644 --- a/pkg/scheduler/framework/events.go +++ b/pkg/scheduler/framework/events.go @@ -61,6 +61,10 @@ var ( PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"} // PodLabelChange is the event when a pod's label is changed. PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"} + // PodTolerationChange is the event when a pod's toleration is changed. + PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"} + // PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed. + PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"} // NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"} // NodeAllocatableChange is the event when node allocatable is changed. @@ -109,6 +113,8 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu podChangeExtracters := []podChangeExtractor{ extractPodLabelsChange, extractPodScaleDown, + extractPodSchedulingGateEliminatedChange, + extractPodTolerationChange, } for _, fn := range podChangeExtracters { @@ -159,6 +165,27 @@ func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent { return nil } +func extractPodTolerationChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent { + if len(newPod.Spec.Tolerations) != len(oldPod.Spec.Tolerations) { + // A Pod got a new toleration. + // Due to API validation, the user can add, but cannot modify or remove tolerations. + // So, it's enough to just check the length of tolerations to notice the update. + // And, any updates in tolerations could make Pod schedulable. + return &PodTolerationChange + } + + return nil +} + +func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent { + if len(newPod.Spec.SchedulingGates) == 0 && len(oldPod.Spec.SchedulingGates) != 0 { + // A scheduling gate on the pod is completely removed. + return &PodSchedulingGateEliminatedChange + } + + return nil +} + // NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s). func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) { nodeChangeExtracters := []nodeChangeExtractor{ diff --git a/pkg/scheduler/framework/events_test.go b/pkg/scheduler/framework/events_test.go index 2e0bb110663b2..35183f8685700 100644 --- a/pkg/scheduler/framework/events_test.go +++ b/pkg/scheduler/framework/events_test.go @@ -370,6 +370,24 @@ func Test_podSchedulingPropertiesChange(t *testing.T) { oldPod: st.MakePod().Annotation("foo", "bar2").Obj(), want: []ClusterEvent{assignedPodOtherUpdate}, }, + { + name: "scheduling gate is eliminated", + newPod: st.MakePod().SchedulingGates([]string{}).Obj(), + oldPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(), + want: []ClusterEvent{PodSchedulingGateEliminatedChange}, + }, + { + name: "scheduling gate is removed, but not completely eliminated", + newPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(), + oldPod: st.MakePod().SchedulingGates([]string{"foo", "bar"}).Obj(), + want: []ClusterEvent{assignedPodOtherUpdate}, + }, + { + name: "pod's tolerations are updated", + newPod: st.MakePod().Toleration("key").Toleration("key2").Obj(), + oldPod: st.MakePod().Toleration("key").Obj(), + want: []ClusterEvent{PodTolerationChange}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index 28c81cf402f0f..bccd4bd8b1df8 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -68,7 +68,7 @@ func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.Clus // https://github.com/kubernetes/kubernetes/pull/122234 return []framework.ClusterEventWithHint{ // Pods can be more schedulable once it's gates are removed - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodSchedulingGatesEliminated}, QueueingHintFn: pl.isSchedulableAfterUpdatePodSchedulingGatesEliminated}, }, nil } @@ -79,7 +79,7 @@ func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Fe }, nil } -func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { +func (pl *SchedulingGates) isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { _, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) if err != nil { return framework.Queue, err @@ -90,8 +90,5 @@ func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod * return framework.QueueSkip, nil } - if len(modifiedPod.Spec.SchedulingGates) == 0 { - return framework.Queue, nil - } - return framework.QueueSkip, nil + return framework.Queue, nil } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go index aa0d50d1e4747..713581e487cf9 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go @@ -88,13 +88,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(), expectedHint: framework.QueueSkip, }, - "skip-queue-on-gates-not-empty": { - pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), - oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), - newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), - expectedHint: framework.QueueSkip, - }, - "queue-on-gates-become-empty": { + "queue-on-the-unsched-pod-updated": { pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(), @@ -109,7 +103,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { if err != nil { t.Fatalf("Creating plugin: %v", err) } - actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := p.(*SchedulingGates).isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger, tc.pod, tc.oldObj, tc.newObj) if tc.expectedErr { require.Error(t, err) return diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 2e6b3f88e52ed..4773c0fdb3526 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -70,7 +70,7 @@ func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.Clus // to determine whether a Pod's update makes the Pod schedulable or not. // https://github.com/kubernetes/kubernetes/pull/122234 clusterEventWithHint = append(clusterEventWithHint, - framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}) + framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange}) return clusterEventWithHint, nil } @@ -210,25 +210,20 @@ func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Fe }, nil } -// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether -// that change made a previously unschedulable pod schedulable. -// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration, -// it may make the Pod schedulable. -func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { - originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) +// isSchedulableAfterPodTolerationChange is invoked whenever a pod's toleration changed. +func (pl *TaintToleration) isSchedulableAfterPodTolerationChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) if err != nil { return framework.Queue, err } - if pod.UID == modifiedPod.UID && - len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) { - // An unscheduled Pod got a new toleration. - // Due to API validation, the user can add, but cannot modify or remove tolerations. - // So, it's enough to just check the length of tolerations to notice the update. - // And, any updates in tolerations could make Pod schedulable. - logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod)) + if pod.UID == modifiedPod.UID { + // The updated Pod is the unschedulable Pod. + logger.V(5).Info("a new toleration is added for the unschedulable Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod)) return framework.Queue, nil } + logger.V(5).Info("a new toleration is added for a Pod, but it's an unrelated Pod and wouldn't change the TaintToleration plugin's decision", "pod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index f88ab88554ee5..4504531603e40 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -421,7 +421,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { } } -func Test_isSchedulableAfterPodChange(t *testing.T) { +func Test_isSchedulableAfterPodTolerationChange(t *testing.T) { testcases := map[string]struct { pod *v1.Pod oldObj, newObj interface{} @@ -472,27 +472,6 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { expectedHint: framework.QueueSkip, expectedErr: false, }, - "skip-updates-not-toleration": { - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - Namespace: "ns-1", - }}, - oldObj: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - Namespace: "ns-1", - }}, - newObj: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - Namespace: "ns-1", - Labels: map[string]string{"foo": "bar"}, - }, - }, - expectedHint: framework.QueueSkip, - expectedErr: false, - }, "queue-on-toleration-added": { pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -530,7 +509,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { if err != nil { t.Fatalf("creating plugin: %v", err) } - actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := p.(*TaintToleration).isSchedulableAfterPodTolerationChange(logger, tc.pod, tc.oldObj, tc.newObj) if tc.expectedErr { if err == nil { t.Errorf("unexpected success") diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index fd7cb9c351558..b2c9551893e14 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -68,6 +68,10 @@ const ( UpdatePodLabel // UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced). UpdatePodScaleDown + // UpdatePodTolerations is an update for pod's tolerations. + UpdatePodTolerations + // UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod. + UpdatePodSchedulingGatesEliminated // updatePodOther is a update for pod's other fields. // It's used only for the internal event handling, and thus unexported. @@ -76,7 +80,7 @@ const ( All ActionType = 1<