Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler] When the hostname and nodename of a node do not match, ensure that pods carrying PVs with nodeAffinity are scheduled correctly. #125398

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions pkg/scheduler/framework/plugins/volumebinding/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/volume/util"
)

// ConflictReason is used for the special strings which explain why
Expand Down Expand Up @@ -129,8 +128,6 @@ type InTreeToCSITranslator interface {
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
// a. Invokes all pre-filter plugins for the pod. GetPodVolumeClaims() is invoked
// here, pod volume information will be saved in current scheduling cycle state for later use.
// If pod has bound immediate PVCs, GetEligibleNodes() is invoked to potentially reduce
// down the list of eligible nodes based on the bound PV's NodeAffinity (if any).
// b. Invokes all filter plugins, parallelized across nodes. FindPodVolumes() is invoked here.
// c. Invokes all score plugins. Future/TBD
// d. Selects the best node for the Pod.
Expand All @@ -153,14 +150,6 @@ type SchedulerVolumeBinder interface {
// unbound with immediate binding (including prebound) and PVs that belong to storage classes of unbound PVCs with delayed binding.
GetPodVolumeClaims(logger klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error)

// GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be
// potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used
// in subsequent scheduling stages.
//
// If eligibleNodes is 'nil', then it indicates that such eligible node reduction cannot be made
// and all nodes should be considered.
GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string])

// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the
// node and returns pod's volumes information.
//
Expand Down Expand Up @@ -383,55 +372,6 @@ func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolume
return
}

// GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be
// potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used
// in subsequent scheduling stages.
//
// Returning 'nil' for eligibleNodes indicates that such eligible node reduction cannot be made and all nodes
// should be considered.
func (b *volumeBinder) GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
if len(boundClaims) == 0 {
return
}

var errs []error
for _, pvc := range boundClaims {
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
if err != nil {
errs = append(errs, err)
continue
}

// if the PersistentVolume is local and has node affinity matching specific node(s),
// add them to the eligible nodes
nodeNames := util.GetLocalPersistentVolumeNodeNames(pv)
if len(nodeNames) != 0 {
// on the first found list of eligible nodes for the local PersistentVolume,
// insert to the eligible node set.
if eligibleNodes == nil {
eligibleNodes = sets.New(nodeNames...)
} else {
// for subsequent finding of eligible nodes for the local PersistentVolume,
// take the intersection of the nodes with the existing eligible nodes
// for cases if PV1 has node affinity to node1 and PV2 has node affinity to node2,
// then the eligible node list should be empty.
eligibleNodes = eligibleNodes.Intersection(sets.New(nodeNames...))
}
}
}

if len(errs) > 0 {
logger.V(4).Info("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs)
return nil
}

if eligibleNodes != nil {
logger.V(4).Info("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes)
}
return
}

// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
// volume information for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
Expand Down
141 changes: 0 additions & 141 deletions pkg/scheduler/framework/plugins/volumebinding/binder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"os"
"reflect"
"sort"
"testing"
"time"
Expand All @@ -32,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -63,9 +61,6 @@ var (
boundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcBound, "pv-node1a", "1", &waitClass)
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", "", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", "", pvcBound, "pv-bound-immediate", "1", &immediateClass)
localPreboundPVC1a = makeTestPVC("local-prebound-pvc-1a", "1G", "", pvcPrebound, "local-pv-node1a", "1", &waitClass)
localPreboundPVC1b = makeTestPVC("local-prebound-pvc-1b", "1G", "", pvcPrebound, "local-pv-node1b", "1", &waitClass)
localPreboundPVC2a = makeTestPVC("local-prebound-pvc-2a", "1G", "", pvcPrebound, "local-pv-node2a", "1", &waitClass)

// PVCs for dynamic provisioning
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
Expand Down Expand Up @@ -97,9 +92,6 @@ var (
pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "10G", "2", unboundPVC2, waitClass)
pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass)
pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass)
localPVNode1a = makeLocalPV("local-pv-node1a", "node1", "5G", "1", nil, waitClass)
localPVNode1b = makeLocalPV("local-pv-node1b", "node1", "10G", "1", nil, waitClass)
localPVNode2a = makeLocalPV("local-pv-node2a", "node2", "5G", "1", nil, waitClass)

// PVs for CSI migration
migrationPVBound = makeTestPVForCSIMigration(zone1Labels, boundMigrationPVC, true)
Expand Down Expand Up @@ -709,12 +701,6 @@ func makeTestPVForCSIMigration(labels map[string]string, pvc *v1.PersistentVolum
return pv
}

func makeLocalPV(name, node, capacity, version string, boundToPVC *v1.PersistentVolumeClaim, className string) *v1.PersistentVolume {
pv := makeTestPV(name, node, capacity, version, boundToPVC, className)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Key = v1.LabelHostname
return pv
}

func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, volume.AnnSelectedNode, node)
Expand Down Expand Up @@ -2326,130 +2312,3 @@ func TestCapacity(t *testing.T) {
})
}
}

func TestGetEligibleNodes(t *testing.T) {
type scenarioType struct {
// Inputs
pvcs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
nodes []*v1.Node

// Expected return values
eligibleNodes sets.Set[string]
}

scenarios := map[string]scenarioType{
"no-bound-claims": {},
"no-nodes-found": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
},
"pv-not-found": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
nodes: []*v1.Node{
node1,
},
},
"node-affinity-mismatch": {
pvcs: []*v1.PersistentVolumeClaim{
preboundPVC,
preboundPVCNode1a,
},
pvs: []*v1.PersistentVolume{
pvNode1a,
},
nodes: []*v1.Node{
node1,
node2,
},
},
"local-pv-with-node-affinity": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.New("node1"),
},
"multi-local-pv-with-different-nodes": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
localPreboundPVC2a,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
localPVNode2a,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.New[string](),
},
"local-and-non-local-pv": {
pvcs: []*v1.PersistentVolumeClaim{
localPreboundPVC1a,
localPreboundPVC1b,
preboundPVC,
immediateBoundPVC,
},
pvs: []*v1.PersistentVolume{
localPVNode1a,
localPVNode1b,
pvNode1a,
pvBoundImmediate,
pvBoundImmediateNode2,
},
nodes: []*v1.Node{
node1,
node2,
},
eligibleNodes: sets.New("node1"),
},
}

run := func(t *testing.T, scenario scenarioType) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Setup
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(scenario.pvs, scenario.pvs)

testEnv.initNodes(scenario.nodes)
testEnv.initClaims(scenario.pvcs, scenario.pvcs)

// Execute
eligibleNodes := testEnv.binder.GetEligibleNodes(logger, scenario.pvcs)

// Validate
if reflect.DeepEqual(scenario.eligibleNodes, eligibleNodes) {
fmt.Println("foo")
}

if compDiff := cmp.Diff(scenario.eligibleNodes, eligibleNodes, cmp.Comparer(func(a, b sets.Set[string]) bool {
return reflect.DeepEqual(a, b)
})); compDiff != "" {
t.Errorf("Unexpected eligible nodes (-want +got):\n%s", compDiff)
}
}

for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario) })
}
}
6 changes: 0 additions & 6 deletions pkg/scheduler/framework/plugins/volumebinding/fake_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -55,11 +54,6 @@ func (b *FakeVolumeBinder) GetPodVolumeClaims(_ klog.Logger, pod *v1.Pod) (podVo
return &PodVolumeClaims{}, nil
}

// GetEligibleNodes implements SchedulerVolumeBinder.GetEligibleNodes.
func (b *FakeVolumeBinder) GetEligibleNodes(_ klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
return nil
}

// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(_ klog.Logger, pod *v1.Pod, _ *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
return nil, b.config.FindReasons, b.config.FindErr
Expand Down
10 changes: 1 addition & 9 deletions pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,6 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return nil, status
}
// Attempt to reduce down the number of nodes to consider in subsequent scheduling stages if pod has bound claims.
var result *framework.PreFilterResult
if eligibleNodes := pl.Binder.GetEligibleNodes(logger, podVolumeClaims.boundClaims); eligibleNodes != nil {
result = &framework.PreFilterResult{
NodeNames: eligibleNodes,
}
}

state.Write(stateKey, &stateData{
podVolumesByNode: make(map[string]*PodVolumes),
podVolumeClaims: &PodVolumeClaims{
Expand All @@ -210,7 +202,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
unboundVolumesDelayBinding: podVolumeClaims.unboundVolumesDelayBinding,
},
})
return result, nil
return nil, nil
}

// PreFilterExtensions returns prefilter extensions, pod add and remove.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
Expand Down Expand Up @@ -127,43 +126,6 @@ func TestVolumeBinding(t *testing.T) {
},
wantPreScoreStatus: framework.NewStatus(framework.Skip),
},
{
name: "all bound with local volumes",
pod: makePod("pod-a").withPVCVolume("pvc-a", "volume-a").withPVCVolume("pvc-b", "volume-b").Pod,
nodes: []*v1.Node{
makeNode("node-a").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", waitSC.Name).withBoundPV("pv-a").PersistentVolumeClaim,
makePVC("pvc-b", waitSC.Name).withBoundPV("pv-b").PersistentVolumeClaim,
},
pvs: []*v1.PersistentVolume{
makePV("pv-a", waitSC.Name).withPhase(v1.VolumeBound).withNodeAffinity(map[string][]string{
v1.LabelHostname: {"node-a"},
}).PersistentVolume,
makePV("pv-b", waitSC.Name).withPhase(v1.VolumeBound).withNodeAffinity(map[string][]string{
v1.LabelHostname: {"node-a"},
}).PersistentVolume,
},
wantPreFilterResult: &framework.PreFilterResult{
NodeNames: sets.New("node-a"),
},
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", waitSC.Name).withBoundPV("pv-a").PersistentVolumeClaim,
makePVC("pvc-b", waitSC.Name).withBoundPV("pv-b").PersistentVolumeClaim,
},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
},
wantPreScoreStatus: framework.NewStatus(framework.Skip),
},
{
name: "PVC does not exist",
pod: makePod("pod-a").withPVCVolume("pvc-a", "").Pod,
Expand Down
Loading