Skip to content

Commit

Permalink
Merge pull request #124003 from carlory/scheduler-rm-non-csi-limit
Browse files Browse the repository at this point in the history
kube-scheduler remove non-csi volumelimit plugins
  • Loading branch information
k8s-ci-robot authored Aug 26, 2024
2 parents 9fe0662 + cba2b3f commit 0bcbc3b
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 1,735 deletions.
4 changes: 0 additions & 4 deletions pkg/scheduler/framework/plugins/names/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ const (
NodeResourcesFit = "NodeResourcesFit"
NodeUnschedulable = "NodeUnschedulable"
NodeVolumeLimits = "NodeVolumeLimits"
AzureDiskLimits = "AzureDiskLimits"
CinderLimits = "CinderLimits"
EBSLimits = "EBSLimits"
GCEPDLimits = "GCEPDLimits"
PodTopologySpread = "PodTopologySpread"
SchedulingGates = "SchedulingGates"
TaintToleration = "TaintToleration"
Expand Down
57 changes: 31 additions & 26 deletions pkg/scheduler/framework/plugins/nodevolumelimits/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)

const (
// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
)

// InTreeToCSITranslator contains methods required to check migratable status
Expand Down Expand Up @@ -173,7 +177,6 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v

logger := klog.FromContext(ctx)

// If CSINode doesn't exist, the predicate may read the limits from Node object
csiNode, err := pl.csiNodeLister.Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default (2 releases)
Expand All @@ -195,7 +198,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
}

// If the node doesn't have volume limits, the predicate will always be true
nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
nodeVolumeLimits := getVolumeLimits(csiNode)
if len(nodeVolumeLimits) == 0 {
return nil
}
Expand All @@ -208,22 +211,23 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
}

attachedVolumeCount := map[string]int{}
for volumeUniqueName, volumeLimitKey := range attachedVolumes {
for volumeUniqueName, driverName := range attachedVolumes {
// Don't count single volume used in multiple pods more than once
delete(newVolumes, volumeUniqueName)
attachedVolumeCount[volumeLimitKey]++
attachedVolumeCount[driverName]++
}

// Count the new volumes count per driver
newVolumeCount := map[string]int{}
for _, volumeLimitKey := range newVolumes {
newVolumeCount[volumeLimitKey]++
for _, driverName := range newVolumes {
newVolumeCount[driverName]++
}

for volumeLimitKey, count := range newVolumeCount {
maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
for driverName, count := range newVolumeCount {
maxVolumeLimit, ok := nodeVolumeLimits[driverName]
if ok {
currentVolumeCount := attachedVolumeCount[volumeLimitKey]
logger.V(5).Info("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey,
currentVolumeCount := attachedVolumeCount[driverName]
logger.V(5).Info("Found plugin volume limits", "node", node.Name, "driverName", driverName,
"maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count,
"pod", klog.KObj(pod))
if currentVolumeCount+count > int(maxVolumeLimit) {
Expand All @@ -235,6 +239,9 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
return nil
}

// filterAttachableVolumes filters the attachable volumes from the pod and adds them to the result map.
// The result map is a map of volumeUniqueName to driver name. The volumeUniqueName is a unique name for
// the volume in the format of "driverName/volumeHandle". And driver name is the CSI driver name.
func (pl *CSILimits) filterAttachableVolumes(
logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
for _, vol := range pod.Spec.Volumes {
Expand Down Expand Up @@ -297,8 +304,7 @@ func (pl *CSILimits) filterAttachableVolumes(
}

volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
result[volumeUniqueName] = volumeLimitKey
result[volumeUniqueName] = driverName
}
return nil
}
Expand Down Expand Up @@ -339,8 +345,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
return nil
}
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
result[volumeUniqueName] = volumeLimitKey
result[volumeUniqueName] = driverName
return nil
}

Expand Down Expand Up @@ -460,17 +465,17 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
}, nil
}

func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
// TODO: stop getting values from Node object in v1.18
nodeVolumeLimits := volumeLimits(nodeInfo)
if csiNode != nil {
for i := range csiNode.Spec.Drivers {
d := csiNode.Spec.Drivers[i]
if d.Allocatable != nil && d.Allocatable.Count != nil {
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18)
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
}
// getVolumeLimits reads the volume limits from CSINode object and returns a map of volume limits.
// The key is the driver name and the value is the maximum number of volumes that can be attached to the node.
// If a key is not found in the map, it means there is no limit for the driver on the node.
func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 {
nodeVolumeLimits := make(map[string]int64)
if csiNode == nil {
return nodeVolumeLimits
}
for _, d := range csiNode.Spec.Drivers {
if d.Allocatable != nil && d.Allocatable.Count != nil {
nodeVolumeLimits[d.Name] = int64(*d.Allocatable.Count)
}
}
return nodeVolumeLimits
Expand Down
66 changes: 20 additions & 46 deletions pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -35,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
Expand All @@ -51,22 +49,6 @@ var (
scName = "csi-sc"
)

// getVolumeLimitKey returns a ResourceName by filter type
func getVolumeLimitKey(filterType string) v1.ResourceName {
switch filterType {
case ebsVolumeFilterType:
return v1.ResourceName(volumeutil.EBSVolumeLimitKey)
case gcePDVolumeFilterType:
return v1.ResourceName(volumeutil.GCEVolumeLimitKey)
case azureDiskVolumeFilterType:
return v1.ResourceName(volumeutil.AzureVolumeLimitKey)
case cinderVolumeFilterType:
return v1.ResourceName(volumeutil.CinderVolumeLimitKey)
default:
return v1.ResourceName(volumeutil.GetCSIAttachLimitKey(filterType))
}
}

func TestCSILimits(t *testing.T) {
runningPod := st.MakePod().PVC("csi-ebs.csi.aws.com-3").Obj()
pendingVolumePod := st.MakePod().PVC("csi-4").Obj()
Expand Down Expand Up @@ -297,7 +279,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 4,
driverNames: []string{ebsCSIDriverName},
test: "fits when node volume limit >= new pods CSI volume",
limitSource: "node",
limitSource: "csinode",
},
{
newPod: csiEBSOneVolPod,
Expand All @@ -306,7 +288,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "doesn't when node volume limit <= pods CSI volume",
limitSource: "node",
limitSource: "csinode",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
Expand All @@ -326,7 +308,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "count pending PVCs towards volume limit <= pods CSI volume",
limitSource: "node",
limitSource: "csinode",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
// two same pending PVCs should be counted as 1
Expand All @@ -337,7 +319,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 4,
driverNames: []string{ebsCSIDriverName},
test: "count multiple pending pvcs towards volume limit >= pods CSI volume",
limitSource: "node",
limitSource: "csinode",
},
// should count PVCs with invalid PV name but valid SC
{
Expand All @@ -347,7 +329,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "should count PVCs with invalid PV name but valid SC",
limitSource: "node",
limitSource: "csinode",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
// don't count a volume which has storageclass missing
Expand All @@ -358,7 +340,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "don't count pvcs with missing SC towards volume limit",
limitSource: "node",
limitSource: "csinode",
},
// don't count multiple volume types
{
Expand All @@ -368,7 +350,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
test: "count pvcs with the same type towards volume limit",
limitSource: "node",
limitSource: "csinode",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
Expand All @@ -378,7 +360,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
test: "don't count pvcs with different type towards volume limit",
limitSource: "node",
limitSource: "csinode",
},
// Tests for in-tree volume migration
{
Expand All @@ -396,10 +378,8 @@ func TestCSILimits(t *testing.T) {
newPod: inTreeInlineVolPod,
existingPods: []*v1.Pod{inTreeTwoVolPod},
filterName: "csi",
maxVols: 2,
driverNames: []string{csilibplugins.AWSEBSInTreePluginName, ebsCSIDriverName},
migrationEnabled: true,
limitSource: "node",
test: "nil csi node",
},
{
Expand Down Expand Up @@ -494,6 +474,7 @@ func TestCSILimits(t *testing.T) {
filterName: "csi",
ephemeralEnabled: true,
driverNames: []string{ebsCSIDriverName},
limitSource: "csinode-with-no-limit",
test: "ephemeral volume missing",
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
},
Expand All @@ -503,6 +484,7 @@ func TestCSILimits(t *testing.T) {
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
driverNames: []string{ebsCSIDriverName},
limitSource: "csinode-with-no-limit",
test: "ephemeral volume not owned",
wantStatus: framework.AsStatus(errors.New("PVC test/abc-xyz was not created for pod test/abc (pod is not owner)")),
},
Expand All @@ -512,6 +494,7 @@ func TestCSILimits(t *testing.T) {
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
limitSource: "csinode-with-no-limit",
test: "ephemeral volume unbound",
},
{
Expand All @@ -522,7 +505,7 @@ func TestCSILimits(t *testing.T) {
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
maxVols: 2,
limitSource: "node",
limitSource: "csinode",
test: "ephemeral doesn't when node volume limit <= pods CSI volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
Expand All @@ -534,7 +517,7 @@ func TestCSILimits(t *testing.T) {
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod},
maxVols: 2,
limitSource: "node",
limitSource: "csinode",
test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
Expand All @@ -546,7 +529,7 @@ func TestCSILimits(t *testing.T) {
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 3,
limitSource: "node",
limitSource: "csinode",
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
Expand All @@ -558,7 +541,7 @@ func TestCSILimits(t *testing.T) {
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 3,
limitSource: "node",
limitSource: "csinode",
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
Expand All @@ -569,7 +552,8 @@ func TestCSILimits(t *testing.T) {
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 4,
maxVols: 5,
limitSource: "csinode",
test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume",
},
{
Expand All @@ -578,7 +562,7 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "skip Filter when the pod only uses secrets and configmaps",
limitSource: "node",
limitSource: "csinode",
wantPreFilterStatus: framework.NewStatus(framework.Skip),
},
{
Expand All @@ -587,13 +571,14 @@ func TestCSILimits(t *testing.T) {
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
test: "don't skip Filter when the pod has pvcs",
limitSource: "node",
limitSource: "csinode",
},
{
newPod: ephemeralPodWithConfigmapAndSecret,
filterName: "csi",
ephemeralEnabled: true,
driverNames: []string{ebsCSIDriverName},
limitSource: "csinode-with-no-limit",
test: "don't skip Filter when the pod has ephemeral volumes",
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
},
Expand Down Expand Up @@ -898,12 +883,6 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
}
var csiNode *storagev1.CSINode

addLimitToNode := func() {
for _, driver := range driverNames {
node.Status.Allocatable[getVolumeLimitKey(driver)] = *resource.NewQuantity(int64(limit), resource.DecimalSI)
}
}

initCSINode := func() {
csiNode = &storagev1.CSINode{
ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"},
Expand All @@ -930,13 +909,8 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
}

switch limitSource {
case "node":
addLimitToNode()
case "csinode":
addDriversCSINode(true)
case "both":
addLimitToNode()
addDriversCSINode(true)
case "csinode-with-no-limit":
addDriversCSINode(false)
case "no-csi-driver":
Expand Down
Loading

0 comments on commit 0bcbc3b

Please sign in to comment.