Skip to content

Commit

Permalink
Merge pull request kubernetes#127715 from gnufied/automated-cherry-pi…
Browse files Browse the repository at this point in the history
…ck-of-#122022-upstream-release-1.29

Automated cherry pick of kubernetes#122022: fix: requeue pods rejected by Extenders properly
  • Loading branch information
k8s-ci-robot authored Oct 11, 2024
2 parents 1402f1d + c75e373 commit 9ad41b6
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 3 deletions.
8 changes: 8 additions & 0 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
logger: logger,
}

if len(f.extenders) > 0 {
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
// We register a defaultEnqueueExtension to framework.ExtenderName here.
// And, in the scheduling cycle, when Extenders reject some Nodes and the pod ends up being unschedulable,
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
f.enqueueExtensions = []framework.EnqueueExtensions{&defaultEnqueueExtension{pluginName: framework.ExtenderName}}
}

if profile == nil {
return f, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ type WeightedAffinityTerm struct {
Weight int32
}

// ExtenderName is a fake plugin name put in UnschedulablePlugins when Extender rejected some Nodes.
const ExtenderName = "Extender"

// Diagnosis records the details to diagnose a scheduling failure.
type Diagnosis struct {
// NodeToStatusMap records the status of each retriable node (status Unschedulable)
Expand Down
19 changes: 17 additions & 2 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,26 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
return nil, diagnosis, err
}

feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
return feasibleNodes, diagnosis, nil
if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
// Extenders filtered out some nodes.
//
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
// When Extenders reject some Nodes and the pod ends up being unschedulable,
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
// by any kind of cluster events.
// https://github.com/kubernetes/kubernetes/issues/122019
if diagnosis.UnschedulablePlugins == nil {
diagnosis.UnschedulablePlugins = sets.New[string]()
}
diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
}

return feasibleNodesAfterExtender, diagnosis, nil
}

func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
Expand Down
41 changes: 40 additions & 1 deletion pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
tests := []struct {
name string
registerPlugins []tf.RegisterPluginFunc
extenders []tf.FakeExtender
nodes []string
pvcs []v1.PersistentVolumeClaim
pod *v1.Pod
Expand Down Expand Up @@ -2078,6 +2079,39 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
},
},
{
name: "test with extender which filters out some Nodes",
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterFilterPlugin(
"FakeFilter",
tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
),
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []tf.FakeExtender{
{
ExtenderName: "FakeExtender1",
Predicates: []tf.FitPredicate{tf.FalsePredicateExtender},
},
},
nodes: []string{"1", "2", "3"},
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
wantNodes: nil,
wErr: &framework.FitError{
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"1": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "1" failed`),
"2": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "2" failed`),
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
},
UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName),
},
},
},
{
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
registerPlugins: []tf.RegisterPluginFunc{
Expand Down Expand Up @@ -2388,10 +2422,15 @@ func TestSchedulerSchedulePod(t *testing.T) {
t.Fatal(err)
}

var extenders []framework.Extender
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
sched := &Scheduler{
Cache: cache,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
Extenders: extenders,
}
sched.applyDefaultHandlers()

Expand All @@ -2403,7 +2442,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
gotFitErr, gotOK := err.(*framework.FitError)
wantFitErr, wantOK := test.wErr.(*framework.FitError)
if gotOK != wantOK {
t.Errorf("Expected err to be FitError: %v, but got %v", wantOK, gotOK)
t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err)
} else if gotOK {
if diff := cmp.Diff(gotFitErr, wantFitErr); diff != "" {
t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)
Expand Down

0 comments on commit 9ad41b6

Please sign in to comment.