Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add startup taint removal feature #2309

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified charts/latest/azuredisk-csi-driver-v0.0.0.tgz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ rules:
verbs: ["get"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get"]

---
Expand Down
4 changes: 3 additions & 1 deletion deploy/rbac-csi-azuredisk-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ rules:
verbs: ["get"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get"]

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
5 changes: 3 additions & 2 deletions pkg/azureconstants/azure_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ const (
BlockDeviceRootPathLinux = "/sys/block"
DummyBlockDevicePathLinux = "/sys/block/sda"
// define different sleep time when hit throttling
SnapshotOpThrottlingSleepSec = 50
MaxThrottlingSleepSec = 1200
SnapshotOpThrottlingSleepSec = 50
MaxThrottlingSleepSec = 1200
AgentNotReadyNodeTaintKeySuffix = "/agent-not-ready"
)

var (
Expand Down
124 changes: 124 additions & 0 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package azuredisk

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand All @@ -33,8 +34,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
Expand All @@ -54,6 +58,17 @@ import (
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

var (
// taintRemovalInitialDelay is the initial delay for node taint removal
taintRemovalInitialDelay = 1 * time.Second
// taintRemovalBackoff is the exponential backoff configuration for node taint removal
taintRemovalBackoff = wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 2,
Steps: 10, // Max delay = 0.5 * 2^9 = ~4 minutes
}
)

// CSIDriver defines the interface for a CSI driver.
type CSIDriver interface {
csi.ControllerServer
Expand Down Expand Up @@ -106,6 +121,7 @@ type DriverCore struct {
forceDetachBackoff bool
endpoint string
disableAVSetNodes bool
removeNotReadyTaint bool
kubeClient kubernetes.Interface
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
Expand Down Expand Up @@ -157,6 +173,7 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.forceDetachBackoff = options.ForceDetachBackoff
driver.endpoint = options.Endpoint
driver.disableAVSetNodes = options.DisableAVSetNodes
driver.removeNotReadyTaint = options.RemoveNotReadyTaint
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand Down Expand Up @@ -277,6 +294,14 @@ func newDriverV1(options *DriverOptions) *Driver {
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})

if kubeClient != nil && driver.removeNotReadyTaint {
// Remove taint from node to indicate driver startup success
// This is done at the last possible moment to prevent race conditions or false positive removals
time.AfterFunc(taintRemovalInitialDelay, func() {
removeTaintInBackground(kubeClient, driver.NodeID, driver.Name, taintRemovalBackoff, removeNotReadyTaint)
})
}
return &driver
}

Expand Down Expand Up @@ -636,3 +661,102 @@ func getVMSSInstanceName(computeName string) (string, error) {
}
return fmt.Sprintf("%s%06s", names[0], strconv.FormatInt(int64(instanceID), 36)), nil
}

// Struct for JSON patch operations
type JSONPatch struct {
OP string `json:"op,omitempty"`
Path string `json:"path,omitempty"`
Value interface{} `json:"value"`
}

// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
func removeTaintInBackground(k8sClient kubernetes.Interface, nodeName, driverName string, backoff wait.Backoff, removalFunc func(kubernetes.Interface, string, string) error) {
backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := removalFunc(k8sClient, nodeName, driverName)
if err != nil {
klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
return false, nil
}
return true, nil
})

if backoffErr != nil {
klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
}
}

// removeNotReadyTaint removes the taint disk.csi.azure.com/agent-not-ready from the local node
// This taint can be optionally applied by users to prevent startup race conditions such as
// https://github.com/kubernetes/kubernetes/issues/95911
func removeNotReadyTaint(clientset kubernetes.Interface, nodeName, driverName string) error {
ctx := context.Background()
node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
}

if err := checkAllocatable(ctx, clientset, nodeName, driverName); err != nil {
return err
}

taintKeyToRemove := driverName + consts.AgentNotReadyNodeTaintKeySuffix
klog.V(2).Infof("removing taint with key %s from local node %s", taintKeyToRemove, nodeName)
var taintsToKeep []corev1.Taint
for _, taint := range node.Spec.Taints {
klog.V(5).Infof("checking taint key %s, value %s, effect %s", taint.Key, taint.Value, taint.Effect)
if taint.Key != taintKeyToRemove {
taintsToKeep = append(taintsToKeep, taint)
} else {
klog.V(2).Infof("queued taint for removal with key %s, effect %s", taint.Key, taint.Effect)
}
}

if len(taintsToKeep) == len(node.Spec.Taints) {
klog.V(2).Infof("No taints to remove on node, skipping taint removal")
return nil
}

patchRemoveTaints := []JSONPatch{
{
OP: "test",
Path: "/spec/taints",
Value: node.Spec.Taints,
},
{
OP: "replace",
Path: "/spec/taints",
Value: taintsToKeep,
},
}

patch, err := json.Marshal(patchRemoveTaints)
if err != nil {
return err
}

_, err = clientset.CoreV1().Nodes().Patch(ctx, nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}
klog.V(2).Infof("removed taint with key %s from local node %s successfully", taintKeyToRemove, nodeName)
return nil
}

func checkAllocatable(ctx context.Context, clientset kubernetes.Interface, nodeName, driverName string) error {
csiNode, err := clientset.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
}

for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
if driver.Allocatable != nil && driver.Allocatable.Count != nil {
klog.V(2).Infof("CSINode Allocatable value is set for driver on node %s, count %d", nodeName, *driver.Allocatable.Count)
return nil
}
return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
}
}

return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
}
2 changes: 2 additions & 0 deletions pkg/azuredisk/azuredisk_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type DriverOptions struct {
Kubeconfig string
Endpoint string
DisableAVSetNodes bool
RemoveNotReadyTaint bool
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -100,6 +101,7 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.BoolVar(&o.ForceDetachBackoff, "force-detach-backoff", true, "boolean flag to force detach in disk detach backoff")
fs.StringVar(&o.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
fs.BoolVar(&o.DisableAVSetNodes, "disable-avset-nodes", false, "disable DisableAvailabilitySetNodes in cloud config for controller")
fs.BoolVar(&o.RemoveNotReadyTaint, "remove-not-ready-taint", true, "remove NotReady taint from node when node is ready")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")

return fs
Expand Down
Loading