Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement Pod smaller update events #127083

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/scheduler/framework/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ var (
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
// PodLabelChange is the event when a pod's label is changed.
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
// PodTolerationChange is the event when a pod's toleration is changed.
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
// NodeAllocatableChange is the event when node allocatable is changed.
Expand Down Expand Up @@ -109,6 +113,8 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
podChangeExtracters := []podChangeExtractor{
extractPodLabelsChange,
extractPodScaleDown,
extractPodSchedulingGateEliminatedChange,
extractPodTolerationChange,
}

for _, fn := range podChangeExtracters {
Expand Down Expand Up @@ -159,6 +165,27 @@ func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
return nil
}

func extractPodTolerationChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if len(newPod.Spec.Tolerations) != len(oldPod.Spec.Tolerations) {
// A Pod got a new toleration.
// Due to API validation, the user can add, but cannot modify or remove tolerations.
// So, it's enough to just check the length of tolerations to notice the update.
// And, any updates in tolerations could make Pod schedulable.
return &PodTolerationChange
}

return nil
}

func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if len(newPod.Spec.SchedulingGates) == 0 && len(oldPod.Spec.SchedulingGates) != 0 {
// A scheduling gate on the pod is completely removed.
return &PodSchedulingGateEliminatedChange
}

return nil
}

// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
nodeChangeExtracters := []nodeChangeExtractor{
Expand Down
18 changes: 18 additions & 0 deletions pkg/scheduler/framework/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,24 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "scheduling gate is eliminated",
newPod: st.MakePod().SchedulingGates([]string{}).Obj(),
oldPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
want: []ClusterEvent{PodSchedulingGateEliminatedChange},
},
{
name: "scheduling gate is removed, but not completely eliminated",
newPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
oldPod: st.MakePod().SchedulingGates([]string{"foo", "bar"}).Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "pod's tolerations are updated",
newPod: st.MakePod().Toleration("key").Toleration("key2").Obj(),
oldPod: st.MakePod().Toleration("key").Obj(),
want: []ClusterEvent{PodTolerationChange},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.Clus
// https://github.com/kubernetes/kubernetes/pull/122234
return []framework.ClusterEventWithHint{
// Pods can be more schedulable once it's gates are removed
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodSchedulingGatesEliminated}, QueueingHintFn: pl.isSchedulableAfterUpdatePodSchedulingGatesEliminated},
}, nil
}

Expand All @@ -79,7 +79,7 @@ func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Fe
}, nil
}

func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
func (pl *SchedulingGates) isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, err
Expand All @@ -90,8 +90,5 @@ func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *
return framework.QueueSkip, nil
}

if len(modifiedPod.Spec.SchedulingGates) == 0 {
return framework.Queue, nil
}
return framework.QueueSkip, nil
return framework.Queue, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(),
expectedHint: framework.QueueSkip,
},
"skip-queue-on-gates-not-empty": {
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
expectedHint: framework.QueueSkip,
},
"queue-on-gates-become-empty": {
"queue-on-the-unsched-pod-updated": {
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(),
Expand All @@ -109,7 +103,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
if err != nil {
t.Fatalf("Creating plugin: %v", err)
}
actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
actualHint, err := p.(*SchedulingGates).isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.Clus
// to determine whether a Pod's update makes the Pod schedulable or not.
// https://github.com/kubernetes/kubernetes/pull/122234
clusterEventWithHint = append(clusterEventWithHint,
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange})
return clusterEventWithHint, nil
}

Expand Down Expand Up @@ -210,25 +210,20 @@ func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Fe
}, nil
}

// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether
// that change made a previously unschedulable pod schedulable.
// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration,
// it may make the Pod schedulable.
func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
// isSchedulableAfterPodTolerationChange is invoked whenever a pod's toleration changed.
func (pl *TaintToleration) isSchedulableAfterPodTolerationChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

if pod.UID == modifiedPod.UID &&
len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) {
// An unscheduled Pod got a new toleration.
// Due to API validation, the user can add, but cannot modify or remove tolerations.
// So, it's enough to just check the length of tolerations to notice the update.
// And, any updates in tolerations could make Pod schedulable.
logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
if pod.UID == modifiedPod.UID {
// The updated Pod is the unschedulable Pod.
logger.V(5).Info("a new toleration is added for the unschedulable Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
return framework.Queue, nil
}

logger.V(5).Info("a new toleration is added for a Pod, but it's an unrelated Pod and wouldn't change the TaintToleration plugin's decision", "pod", klog.KObj(modifiedPod))

return framework.QueueSkip, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
}
}

func Test_isSchedulableAfterPodChange(t *testing.T) {
func Test_isSchedulableAfterPodTolerationChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj interface{}
Expand Down Expand Up @@ -472,27 +472,6 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"skip-updates-not-toleration": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
}},
oldObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
}},
newObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
Labels: map[string]string{"foo": "bar"},
},
},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"queue-on-toleration-added": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -530,7 +509,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
if err != nil {
t.Fatalf("creating plugin: %v", err)
}
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodTolerationChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
if err == nil {
t.Errorf("unexpected success")
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ const (
UpdatePodLabel
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
UpdatePodScaleDown
// UpdatePodTolerations is an update for pod's tolerations.
UpdatePodTolerations
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
UpdatePodSchedulingGatesEliminated

// updatePodOther is a update for pod's other fields.
// It's used only for the internal event handling, and thus unexported.
Expand All @@ -76,7 +80,7 @@ const (
All ActionType = 1<<iota - 1

// Use the general Update type if you don't either know or care the specific sub-Update type to use.
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | updatePodOther
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther
)

// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
Expand Down
18 changes: 18 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ func Test_UnionedGVKs(t *testing.T) {
plugins schedulerapi.PluginSet
want map[framework.GVK]framework.ActionType
enableInPlacePodVerticalScaling bool
enableSchedulerQueueingHints bool
}{
{
name: "filter without EnqueueExtensions plugin",
Expand Down Expand Up @@ -894,10 +895,27 @@ func Test_UnionedGVKs(t *testing.T) {
},
enableInPlacePodVerticalScaling: true,
},
{
name: "plugins with default profile (queueingHint: enabled)",
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.UpdatePodTolerations | framework.UpdatePodSchedulingGatesEliminated | framework.Delete,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test shows that we eliminated all Pod/Update events from in-tree plugins.

framework.Node: framework.All,
framework.CSINode: framework.All - framework.Delete,
framework.CSIDriver: framework.All - framework.Delete,
framework.CSIStorageCapacity: framework.All - framework.Delete,
framework.PersistentVolume: framework.All - framework.Delete,
framework.PersistentVolumeClaim: framework.All - framework.Delete,
framework.StorageClass: framework.All - framework.Delete,
},
enableInPlacePodVerticalScaling: true,
enableSchedulerQueueingHints: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.enableSchedulerQueueingHints)

_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
Expand Down