Skip to content

kubelet nestedPendingOperations may leak operation lead same pv not to do mount or umount operation #109047

Closed
@Dingshujie

Description

What happened?

on my scenes,create same cronjob use same sfs(the nfs storage on Huawei Cloud) type storage by pvc to do same training work. At the same time, a node will schedule many pods using the same pvc, and there will be many mount and umount tasks about this pvc executed at the same time. And we find all pods on a node at container creating state, because of waiting pvc to mount.
check log, we can't find any we can't find any operationExecutor.MountVolume started or failed log like "operationExecutor.MountVolume started for"

and also we check volume_manager_total_volumes metrics, volume is add to dsw.
analyze code, if isExpectedError, kubelet not log any thing

// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
	return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err)
}

and operationExecutor will generate operation, add call pendingOperations Run, pendingOperations Run will check operation slice, to find whether has same operation exists, and if operation exist, will check operation's operationPending status, if equal true, means same operation now is excuting, and return AlreadyExists.
operationKey composed of volumeName, podName, nodeName

// MountVolume Func
// Avoid executing mount/map from multiple pods referencing the
// same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName

// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
	// volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
	// referencing the same volume in parallel
	podName = util.GetUniquePodName(volumeToMount.Pod)
}

// TODO mount_device
return oe.pendingOperations.Run(
		volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)

so MountVolume operationKey {volumeName, EmptyUniquePodName,EmptyNodeName}, use EmptyUniquePodName to avoid executing mount/map from multiple pods referencing the same volume in parallel

// UnmountVolume Func
// All volume plugins can execute unmount/unmap for multiple pods referencing the
	// same volume in parallel
	podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)

	return oe.pendingOperations.Run(
		volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)

soUnmountVolume operationKey {volumeName, podUID,EmptyNodeName}, because All volume plugins can execute unmount/unmap for multiple pods referencing the same volume in parallel

now see isOperationExists func, if previousOp.podName equals to EmptyUniquePodName or current operation podName equals to EmptyUniquePodName, will return podNameMatch, mount operation can find umount operation or umount operation can find mount operation

// This is an internal function and caller should acquire and release the lock
func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {

	// If volumeName is empty, operation can be executed concurrently
	if key.volumeName == EmptyUniqueVolumeName {
		return false, -1
	}

	for previousOpIndex, previousOp := range grm.operations {
		volumeNameMatch := previousOp.key.volumeName == key.volumeName

		podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
			key.podName == EmptyUniquePodName ||
			previousOp.key.podName == key.podName

		nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
			key.nodeName == EmptyNodeName ||
			previousOp.key.nodeName == key.nodeName

		if volumeNameMatch && podNameMatch && nodeNameMatch {
			return true, previousOpIndex
		}
	}

	return false, -1
}
func (grm *nestedPendingOperations) Run(
	volumeName v1.UniqueVolumeName,
	podName volumetypes.UniquePodName,
	nodeName types.NodeName,
	generatedOperations volumetypes.GeneratedOperations) error {
	grm.lock.Lock()
	defer grm.lock.Unlock()

	opKey := operationKey{volumeName, podName, nodeName}

	opExists, previousOpIndex := grm.isOperationExists(opKey)
	if opExists {
		previousOp := grm.operations[previousOpIndex]
		// Operation already exists
		if previousOp.operationPending {
			// Operation is pending
			return NewAlreadyExistsError(opKey)
		}

		backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
		if backOffErr != nil {
			if previousOp.operationName == generatedOperations.OperationName {
				return backOffErr
			}
			// previous operation and new operation are different. reset op. name and exp. backoff
			grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
			grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
		}

		// Update existing operation to mark as pending.
		grm.operations[previousOpIndex].operationPending = true
		grm.operations[previousOpIndex].key = opKey
	} else {
		// Create a new operation
		grm.operations = append(grm.operations,
			operation{
				key:              opKey,
				operationPending: true,
				operationName:    generatedOperations.OperationName,
				expBackoff:       exponentialbackoff.ExponentialBackoff{},
			})
	}

	go func() (eventErr, detailedErr error) {
		// Handle unhandled panics (very unlikely)
		defer k8sRuntime.HandleCrash()
		// Handle completion of and error, if any, from operationFunc()
		defer grm.operationComplete(opKey, &detailedErr)
		return generatedOperations.Run()
	}()

	return nil
}

because of isOperationExists, this mean if a volume now is excuting mount, a umount can't add to operations, and if a umount now is excuting, a mount operation can't add to operations, but multiple pods referencing the same volume can add multiple umount operation into operation, may lead operation leak.

t0 Operations: {umount, pvname, pod1, pending=true} {umount, pvname, pod2, pending=true}
t1 Operations: {umount, pvname, pod1, pending=false} {umount, pvname, pod2, pending=true}
t2 Operations: {mount, pvname, EmptyUniquePodName, pending=true} {umount, pvname, pod2, pending=true}
t3 Operations: {mount, pvname, EmptyUniquePodName, pending=false} {umount, pvname, pod2, pending=true}
t4 Operations: {umount, pvname, pod2, pending=true} {umount, pvname, pod2, pending=true}
t5 Operations: {umount, pvname, pod2, pending=false} {umount, pvname, pod2, pending=true}
t6 Operations: {umount, pvname, pod2, pending=true}

t0 two pod do umount volume operation
t1 pod1 umount failed, update pending to false
t2 pod3 add, operation excuter add mount operation, will use index 0, override umount operation
t3 mount operation failed , update pending to false
t4 pod2 umount operation add, will find index 0, add override opertation, now ,we have two same umount operation, now we have two goroutines to do pod2 umount operations
t5 first goroutine pod2 umount failed, will update index 0 operation, update pending to false
t6 second goroutine pod2 umount success, will delete index 0 operation,
finally lead a umount opertion in cache, and this operation will lead all mount opeartion return AlreadyExistsError, and not cause any csi call.

and i think there exists some other scene can trigger this leak.

and also, when operation success, deleteOperation, will use tail element to replace, and may increase the probalility of occurrence. because isOperationExists will always pick first match one.

func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
	// Assumes lock has been acquired by caller.

	opIndex := -1
	for i, op := range grm.operations {
		if op.key.volumeName == key.volumeName &&
			op.key.podName == key.podName &&
			op.key.nodeName == key.nodeName {
			opIndex = i
			break
		}
	}

	if opIndex < 0 {
		return
	}

	// Delete index without preserving order
	grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
	grm.operations = grm.operations[:len(grm.operations)-1]
}

and also override mechanism may lead exponentialBackOff invalid, for example mount and umount take turns to excute.

What did you expect to happen?

pv can mount , pod can running

How can we reproduce it (as minimally and precisely as possible)?

reproduce step:
create same cronjob that use same pvc(use csi sfs)
and csi driver restart periodic

Anything else we need to know?

No response

Kubernetes version

$ kubectl version
# paste output here
1.23

Cloud provider

HuaweiCloud

OS version

# On Linux:
$ cat /etc/os-release
# paste output here
$ uname -a
# paste output here

# On Windows:
C:\> wmic os get Caption, Version, BuildNumber, OSArchitecture
# paste output here

Install tools

Container runtime (CRI) and version (if applicable)

Related plugins (CNI, CSI, ...) and versions (if applicable)

CSI

Metadata

Assignees

No one assigned

    Labels

    kind/bugCategorizes issue or PR as related to a bug.needs-triageIndicates an issue or PR lacks a `triage/foo` label and requires one.sig/storageCategorizes an issue or PR as relevant to SIG Storage.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions