Skip to content

Commit

Permalink
Merge pull request kubernetes#127083 from sanposhiho/scheduler-smalle…
Browse files Browse the repository at this point in the history
…r-event

feat: implement Pod smaller update events
  • Loading branch information
k8s-ci-robot authored Sep 3, 2024
2 parents 85384fe + 03e3779 commit 4bc6a11
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 52 deletions.
27 changes: 27 additions & 0 deletions pkg/scheduler/framework/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ var (
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
// PodLabelChange is the event when a pod's label is changed.
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
// PodTolerationChange is the event when a pod's toleration is changed.
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
// NodeAllocatableChange is the event when node allocatable is changed.
Expand Down Expand Up @@ -109,6 +113,8 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
podChangeExtracters := []podChangeExtractor{
extractPodLabelsChange,
extractPodScaleDown,
extractPodSchedulingGateEliminatedChange,
extractPodTolerationChange,
}

for _, fn := range podChangeExtracters {
Expand Down Expand Up @@ -159,6 +165,27 @@ func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
return nil
}

func extractPodTolerationChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if len(newPod.Spec.Tolerations) != len(oldPod.Spec.Tolerations) {
// A Pod got a new toleration.
// Due to API validation, the user can add, but cannot modify or remove tolerations.
// So, it's enough to just check the length of tolerations to notice the update.
// And, any updates in tolerations could make Pod schedulable.
return &PodTolerationChange
}

return nil
}

func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if len(newPod.Spec.SchedulingGates) == 0 && len(oldPod.Spec.SchedulingGates) != 0 {
// A scheduling gate on the pod is completely removed.
return &PodSchedulingGateEliminatedChange
}

return nil
}

// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
nodeChangeExtracters := []nodeChangeExtractor{
Expand Down
18 changes: 18 additions & 0 deletions pkg/scheduler/framework/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,24 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "scheduling gate is eliminated",
newPod: st.MakePod().SchedulingGates([]string{}).Obj(),
oldPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
want: []ClusterEvent{PodSchedulingGateEliminatedChange},
},
{
name: "scheduling gate is removed, but not completely eliminated",
newPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
oldPod: st.MakePod().SchedulingGates([]string{"foo", "bar"}).Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "pod's tolerations are updated",
newPod: st.MakePod().Toleration("key").Toleration("key2").Obj(),
oldPod: st.MakePod().Toleration("key").Obj(),
want: []ClusterEvent{PodTolerationChange},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.Clus
// https://github.com/kubernetes/kubernetes/pull/122234
return []framework.ClusterEventWithHint{
// Pods can be more schedulable once it's gates are removed
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodSchedulingGatesEliminated}, QueueingHintFn: pl.isSchedulableAfterUpdatePodSchedulingGatesEliminated},
}, nil
}

Expand All @@ -79,7 +79,7 @@ func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Fe
}, nil
}

func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
func (pl *SchedulingGates) isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, err
Expand All @@ -90,8 +90,5 @@ func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *
return framework.QueueSkip, nil
}

if len(modifiedPod.Spec.SchedulingGates) == 0 {
return framework.Queue, nil
}
return framework.QueueSkip, nil
return framework.Queue, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(),
expectedHint: framework.QueueSkip,
},
"skip-queue-on-gates-not-empty": {
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
expectedHint: framework.QueueSkip,
},
"queue-on-gates-become-empty": {
"queue-on-the-unsched-pod-updated": {
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(),
Expand All @@ -109,7 +103,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
if err != nil {
t.Fatalf("Creating plugin: %v", err)
}
actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
actualHint, err := p.(*SchedulingGates).isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.Clus
// to determine whether a Pod's update makes the Pod schedulable or not.
// https://github.com/kubernetes/kubernetes/pull/122234
clusterEventWithHint = append(clusterEventWithHint,
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange})
return clusterEventWithHint, nil
}

Expand Down Expand Up @@ -210,25 +210,20 @@ func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Fe
}, nil
}

// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether
// that change made a previously unschedulable pod schedulable.
// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration,
// it may make the Pod schedulable.
func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
// isSchedulableAfterPodTolerationChange is invoked whenever a pod's toleration changed.
func (pl *TaintToleration) isSchedulableAfterPodTolerationChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

if pod.UID == modifiedPod.UID &&
len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) {
// An unscheduled Pod got a new toleration.
// Due to API validation, the user can add, but cannot modify or remove tolerations.
// So, it's enough to just check the length of tolerations to notice the update.
// And, any updates in tolerations could make Pod schedulable.
logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
if pod.UID == modifiedPod.UID {
// The updated Pod is the unschedulable Pod.
logger.V(5).Info("a new toleration is added for the unschedulable Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
return framework.Queue, nil
}

logger.V(5).Info("a new toleration is added for a Pod, but it's an unrelated Pod and wouldn't change the TaintToleration plugin's decision", "pod", klog.KObj(modifiedPod))

return framework.QueueSkip, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
}
}

func Test_isSchedulableAfterPodChange(t *testing.T) {
func Test_isSchedulableAfterPodTolerationChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj interface{}
Expand Down Expand Up @@ -472,27 +472,6 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"skip-updates-not-toleration": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
}},
oldObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
}},
newObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
Labels: map[string]string{"foo": "bar"},
},
},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"queue-on-toleration-added": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -530,7 +509,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
if err != nil {
t.Fatalf("creating plugin: %v", err)
}
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodTolerationChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
if err == nil {
t.Errorf("unexpected success")
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ const (
UpdatePodLabel
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
UpdatePodScaleDown
// UpdatePodTolerations is an update for pod's tolerations.
UpdatePodTolerations
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
UpdatePodSchedulingGatesEliminated

// updatePodOther is a update for pod's other fields.
// It's used only for the internal event handling, and thus unexported.
Expand All @@ -76,7 +80,7 @@ const (
All ActionType = 1<<iota - 1

// Use the general Update type if you don't either know or care the specific sub-Update type to use.
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | updatePodOther
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther
)

// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
Expand Down
18 changes: 18 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ func Test_UnionedGVKs(t *testing.T) {
plugins schedulerapi.PluginSet
want map[framework.GVK]framework.ActionType
enableInPlacePodVerticalScaling bool
enableSchedulerQueueingHints bool
}{
{
name: "filter without EnqueueExtensions plugin",
Expand Down Expand Up @@ -894,10 +895,27 @@ func Test_UnionedGVKs(t *testing.T) {
},
enableInPlacePodVerticalScaling: true,
},
{
name: "plugins with default profile (queueingHint: enabled)",
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.UpdatePodTolerations | framework.UpdatePodSchedulingGatesEliminated | framework.Delete,
framework.Node: framework.All,
framework.CSINode: framework.All - framework.Delete,
framework.CSIDriver: framework.All - framework.Delete,
framework.CSIStorageCapacity: framework.All - framework.Delete,
framework.PersistentVolume: framework.All - framework.Delete,
framework.PersistentVolumeClaim: framework.All - framework.Delete,
framework.StorageClass: framework.All - framework.Delete,
},
enableInPlacePodVerticalScaling: true,
enableSchedulerQueueingHints: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.enableSchedulerQueueingHints)

_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
Expand Down

0 comments on commit 4bc6a11

Please sign in to comment.