Skip to content

Commit

Permalink
Merge pull request #98041 from Huang-Wei/sched-enqueue-1
Browse files Browse the repository at this point in the history
Surface info of failed plugins during PerFilter and Filter
  • Loading branch information
k8s-ci-robot authored Jan 29, 2021
2 parents e05c9ab + f8a6bdb commit f402e47
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 169 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

Expand Down
57 changes: 32 additions & 25 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/features"
Expand Down Expand Up @@ -106,25 +107,25 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
return result, ErrNoNodesAvailable
}

feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")

if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses,
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}

// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
Expand All @@ -139,7 +140,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework

return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}
Expand Down Expand Up @@ -197,19 +198,19 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
return numNodes
}

func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, filteredNodesStatuses framework.NodeToStatusMap) ([]*v1.Node, error) {
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
nnn := pod.Status.NominatedNodeName
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
if err != nil {
return nil, err
}
node := []*framework.NodeInfo{nodeInfo}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, node)
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
if err != nil {
return nil, err
}

feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, err
}
Expand All @@ -219,48 +220,53 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po

// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.NewString(),
}

// Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
return nil, diagnosis, err
}
if !s.IsSuccess() {
if !s.IsUnschedulable() {
return nil, nil, s.AsError()
return nil, diagnosis, s.AsError()
}
// All nodes will have the same status. Some non trivial refactoring is
// needed to avoid this copy.
for _, n := range allNodes {
filteredNodesStatuses[n.Node().Name] = s
diagnosis.NodeToStatusMap[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
return nil, diagnosis, nil
}

// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, filteredNodesStatuses)
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
if err != nil {
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len(feasibleNodes) != 0 {
return feasibleNodes, filteredNodesStatuses, nil
return feasibleNodes, diagnosis, nil
}
}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, allNodes)
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
if err != nil {
return nil, nil, err
return nil, diagnosis, err
}
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)

feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, nil, err
return nil, diagnosis, err
}
return feasibleNodes, filteredNodesStatuses, nil
return feasibleNodes, diagnosis, nil
}

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
Expand All @@ -269,7 +275,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
statuses framework.NodeToStatusMap,
diagnosis framework.Diagnosis,
nodes []*framework.NodeInfo) ([]*v1.Node, error) {
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))

Expand Down Expand Up @@ -309,7 +315,8 @@ func (g *genericScheduler) findNodesThatPassFilters(
}
} else {
statusesLock.Lock()
statuses[nodeInfo.Node().Name] = status
diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
statusesLock.Unlock()
}
}
Expand All @@ -326,7 +333,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
parallelize.Until(ctx, len(nodes), checkNode)
processedNodes := int(feasibleNodesLen) + len(statuses)
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)

feasibleNodes = feasibleNodes[:feasibleNodesLen]
Expand Down
Loading

0 comments on commit f402e47

Please sign in to comment.