Skip to content

Commit

Permalink
scheduler volumebinding: leverage PreFilterResult
Browse files Browse the repository at this point in the history
This change will leverage the new PreFilterResult
to reduce down the list of eligible nodes for pod
using Bound Local PVs during PreFilter stage so
that only the node(s) which local PV node affinity
matches will be cosnidered in subsequent scheduling
stages.

Today, the NodeAffinity check is done during Filter
which means all nodes will be considered even though
there may be a large number of nodes that are not
eligible due to not matching the pod's bound local
PV(s)' node affinity requirement. Here we can
reduce down the node list in PreFilter to ensure that
during Filter we are only considering the reduced
list and thus can provide a more clear message to
users when node(s) are not available for scheduling
since the list only contains relevant nodes.

If error is encountered (e.g. PV cache read error) or
if node list reduction cannot be done (e.g. pod uses
no local PVs), then we will still proceed to consider
all nodes for the rest of scheduling stages.

Signed-off-by: Yibo Zhuang <yibzhuang@gmail.com>
  • Loading branch information
yibozhuang committed Nov 18, 2022
1 parent 8e48df1 commit 380c7f2
Show file tree
Hide file tree
Showing 8 changed files with 604 additions and 6 deletions.
64 changes: 63 additions & 1 deletion pkg/scheduler/framework/plugins/volumebinding/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
"k8s.io/kubernetes/pkg/volume/util"
)

// ConflictReason is used for the special strings which explain why
Expand Down Expand Up @@ -126,6 +127,8 @@ type InTreeToCSITranslator interface {
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
// a. Invokes all pre-filter plugins for the pod. GetPodVolumes() is invoked
// here, pod volume information will be saved in current scheduling cycle state for later use.
// If pod has bound immediate PVCs, GetEligibleNodes() is invoked to potentially reduce
// down the list of eligible nodes based on the bound PV's NodeAffinity (if any).
// b. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// c. Invokes all score plugins. Future/TBD
// d. Selects the best node for the Pod.
Expand All @@ -148,6 +151,14 @@ type SchedulerVolumeBinder interface {
// and unbound with immediate binding (including prebound)
GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error)

// GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be
// potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used
// in subsequent scheduling stages.
//
// If eligibleNodes is 'nil', then it indicates that such eligible node reduction cannot be made
// and all nodes should be considered.
GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.String)

// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the
// node and returns pod's volumes information.
//
Expand Down Expand Up @@ -208,6 +219,8 @@ type volumeBinder struct {
csiStorageCapacityLister storagelisters.CSIStorageCapacityLister
}

var _ SchedulerVolumeBinder = &volumeBinder{}

// CapacityCheck contains additional parameters for NewVolumeBinder that
// are only needed when checking volume sizes against available storage
// capacity is desired.
Expand Down Expand Up @@ -248,7 +261,7 @@ func NewVolumeBinder(
}

// FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs
// for the given pod and node. If the node does not fit, confilict reasons are
// for the given pod and node. If the node does not fit, conflict reasons are
// returned.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
podVolumes = &PodVolumes{}
Expand Down Expand Up @@ -356,6 +369,55 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
return
}

// GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be
// potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used
// in subsequent scheduling stages.
//
// Returning 'nil' for eligibleNodes indicates that such eligible node reduction cannot be made and all nodes
// should be considered.
func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.String) {
if len(boundClaims) == 0 {
return
}

var errs []error
for _, pvc := range boundClaims {
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
if err != nil {
errs = append(errs, err)
continue
}

// if the PersistentVolume is local and has node affinity matching specific node(s),
// add them to the eligible nodes
nodeNames := util.GetLocalPersistentVolumeNodeNames(pv)
if len(nodeNames) != 0 {
// on the first found list of eligible nodes for the local PersistentVolume,
// insert to the eligible node set.
if eligibleNodes == nil {
eligibleNodes = sets.NewString(nodeNames...)
} else {
// for subsequent finding of eligible nodes for the local PersistentVolume,
// take the intersection of the nodes with the existing eligible nodes
// for cases if PV1 has node affinity to node1 and PV2 has node affinity to node2,
// then the eligible node list should be empty.
eligibleNodes = eligibleNodes.Intersection(sets.NewString(nodeNames...))
}
}
}

if len(errs) > 0 {
klog.V(4).InfoS("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs)
return nil
}

if eligibleNodes != nil {
klog.V(4).InfoS("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes)
}
return
}

// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
// volume information for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
Expand Down
140 changes: 140 additions & 0 deletions pkg/scheduler/framework/plugins/volumebinding/binder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"sort"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -58,6 +60,9 @@ var (
boundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcBound, "pv-node1a", "1", &waitClass)
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", "", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", "", pvcBound, "pv-bound-immediate", "1", &immediateClass)
localPreboundPVC1a = makeTestPVC("local-prebound-pvc-1a", "1G", "", pvcPrebound, "local-pv-node1a", "1", &waitClass)
localPreboundPVC1b = makeTestPVC("local-prebound-pvc-1b", "1G", "", pvcPrebound, "local-pv-node1b", "1", &waitClass)
localPreboundPVC2a = makeTestPVC("local-prebound-pvc-2a", "1G", "", pvcPrebound, "local-pv-node2a", "1", &waitClass)

// PVCs for dynamic provisioning
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
Expand Down Expand Up @@ -89,6 +94,9 @@ var (
pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "10G", "2", unboundPVC2, waitClass)
pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass)
pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass)
localPVNode1a = makeLocalPV("local-pv-node1a", "node1", "5G", "1", nil, waitClass)
localPVNode1b = makeLocalPV("local-pv-node1b", "node1", "10G", "1", nil, waitClass)
localPVNode2a = makeLocalPV("local-pv-node2a", "node2", "5G", "1", nil, waitClass)

// PVs for CSI migration
migrationPVBound = makeTestPVForCSIMigration(zone1Labels, boundMigrationPVC, true)
Expand Down Expand Up @@ -718,6 +726,12 @@ func makeTestPVForCSIMigration(labels map[string]string, pvc *v1.PersistentVolum
return pv
}

func makeLocalPV(name, node, capacity, version string, boundToPVC *v1.PersistentVolumeClaim, className string) *v1.PersistentVolume {
pv := makeTestPV(name, node, capacity, version, boundToPVC, className)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Key = v1.LabelHostname
return pv
}

func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, volume.AnnSelectedNode, node)
Expand Down Expand Up @@ -2318,3 +2332,129 @@ func TestCapacity(t *testing.T) {
})
}
}

func TestGetEligibleNodes(t *testing.T) {
type scenarioType struct {
// Inputs
pvcs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
nodes []*v1.Node

// Expected return values
eligibleNodes sets.String
}

scenarios := map[string]scenarioType{
"no-bound-claims": {},
"no-nodes-found": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
},
"pv-not-found": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
nodes: []*v1.Node{
node1,
},
},
"node-affinity-mismatch": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
pvs: []*v1.PersistentVolume{
pvNode1a,
},
nodes: []*v1.Node{
node1,
node2,
},
},
"local-pv-with-node-affinity": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.NewString("node1"),
},
"multi-local-pv-with-different-nodes": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
localPreboundPVC2a,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
localPVNode2a,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.NewString(),
},
"local-and-non-local-pv": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
preboundPVC,
immediateBoundPVC,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
pvNode1a,
pvBoundImmediate,
pvBoundImmediateNode2,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.NewString("node1"),
},
}

run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv.initVolumes(scenario.pvs, scenario.pvs)

testEnv.initNodes(scenario.nodes)
testEnv.initClaims(scenario.pvcs, scenario.pvcs)

// Execute
eligibleNodes := testEnv.binder.GetEligibleNodes(scenario.pvcs)

// Validate
if reflect.DeepEqual(scenario.eligibleNodes, eligibleNodes) {
fmt.Println("foo")
}

if compDiff := cmp.Diff(scenario.eligibleNodes, eligibleNodes, cmp.Comparer(func(a, b sets.String) bool {
return reflect.DeepEqual(a, b)
})); compDiff != "" {
t.Errorf("Unexpected eligible nodes (-want +got):\n%s", compDiff)
}
}

for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario) })
}
}
8 changes: 8 additions & 0 deletions pkg/scheduler/framework/plugins/volumebinding/fake_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

// FakeVolumeBinderConfig holds configurations for fake volume binder.
Expand All @@ -46,11 +47,18 @@ type FakeVolumeBinder struct {
BindCalled bool
}

var _ SchedulerVolumeBinder = &FakeVolumeBinder{}

// GetPodVolumes implements SchedulerVolumeBinder.GetPodVolumes.
func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
return nil, nil, nil, nil
}

// GetEligibleNodes implements SchedulerVolumeBinder.GetEligibleNodes.
func (b *FakeVolumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.String) {
return nil
}

// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
return nil, b.config.FindReasons, b.config.FindErr
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,16 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return nil, status
}
// Attempt to reduce down the number of nodes to consider in subsequent scheduling stages if pod has bound claims.
var result *framework.PreFilterResult
if eligibleNodes := pl.Binder.GetEligibleNodes(boundClaims); eligibleNodes != nil {
result = &framework.PreFilterResult{
NodeNames: eligibleNodes,
}
}

state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*PodVolumes)})
return nil, nil
return result, nil
}

// PreFilterExtensions returns prefilter extensions, pod add and remove.
Expand Down
Loading

0 comments on commit 380c7f2

Please sign in to comment.