kubelet nestedPendingOperations may leak operation lead same pv not to do mount or umount operation #109047
Description
What happened?
on my scenes,create same cronjob use same sfs(the nfs storage on Huawei Cloud) type storage by pvc to do same training work. At the same time, a node will schedule many pods using the same pvc, and there will be many mount and umount tasks about this pvc executed at the same time. And we find all pods on a node at container creating state, because of waiting pvc to mount.
check log, we can't find any we can't find any operationExecutor.MountVolume started or failed log like "operationExecutor.MountVolume started for"
and also we check volume_manager_total_volumes metrics, volume is add to dsw.
analyze code, if isExpectedError, kubelet not log any thing
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err)
}
and operationExecutor will generate operation, add call pendingOperations Run, pendingOperations Run will check operation slice, to find whether has same operation exists, and if operation exist, will check operation's operationPending status, if equal true, means same operation now is excuting, and return AlreadyExists.
operationKey composed of volumeName, podName, nodeName
// MountVolume Func
// Avoid executing mount/map from multiple pods referencing the
// same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
// volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
// referencing the same volume in parallel
podName = util.GetUniquePodName(volumeToMount.Pod)
}
// TODO mount_device
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
so MountVolume operationKey {volumeName, EmptyUniquePodName,EmptyNodeName}, use EmptyUniquePodName to avoid executing mount/map from multiple pods referencing the same volume in parallel
// UnmountVolume Func
// All volume plugins can execute unmount/unmap for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
soUnmountVolume operationKey {volumeName, podUID,EmptyNodeName}, because All volume plugins can execute unmount/unmap for multiple pods referencing the same volume in parallel
now see isOperationExists func, if previousOp.podName equals to EmptyUniquePodName or current operation podName equals to EmptyUniquePodName, will return podNameMatch, mount operation can find umount operation or umount operation can find mount operation
// This is an internal function and caller should acquire and release the lock
func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
// If volumeName is empty, operation can be executed concurrently
if key.volumeName == EmptyUniqueVolumeName {
return false, -1
}
for previousOpIndex, previousOp := range grm.operations {
volumeNameMatch := previousOp.key.volumeName == key.volumeName
podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
key.podName == EmptyUniquePodName ||
previousOp.key.podName == key.podName
nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
key.nodeName == EmptyNodeName ||
previousOp.key.nodeName == key.nodeName
if volumeNameMatch && podNameMatch && nodeNameMatch {
return true, previousOpIndex
}
}
return false, -1
}
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error {
grm.lock.Lock()
defer grm.lock.Unlock()
opKey := operationKey{volumeName, podName, nodeName}
opExists, previousOpIndex := grm.isOperationExists(opKey)
if opExists {
previousOp := grm.operations[previousOpIndex]
// Operation already exists
if previousOp.operationPending {
// Operation is pending
return NewAlreadyExistsError(opKey)
}
backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
if backOffErr != nil {
if previousOp.operationName == generatedOperations.OperationName {
return backOffErr
}
// previous operation and new operation are different. reset op. name and exp. backoff
grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
}
// Update existing operation to mark as pending.
grm.operations[previousOpIndex].operationPending = true
grm.operations[previousOpIndex].key = opKey
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
key: opKey,
operationPending: true,
operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
go func() (eventErr, detailedErr error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(opKey, &detailedErr)
return generatedOperations.Run()
}()
return nil
}
because of isOperationExists, this mean if a volume now is excuting mount, a umount can't add to operations, and if a umount now is excuting, a mount operation can't add to operations, but multiple pods referencing the same volume can add multiple umount operation into operation, may lead operation leak.
t0 Operations: {umount, pvname, pod1, pending=true} {umount, pvname, pod2, pending=true}
t1 Operations: {umount, pvname, pod1, pending=false} {umount, pvname, pod2, pending=true}
t2 Operations: {mount, pvname, EmptyUniquePodName, pending=true} {umount, pvname, pod2, pending=true}
t3 Operations: {mount, pvname, EmptyUniquePodName, pending=false} {umount, pvname, pod2, pending=true}
t4 Operations: {umount, pvname, pod2, pending=true} {umount, pvname, pod2, pending=true}
t5 Operations: {umount, pvname, pod2, pending=false} {umount, pvname, pod2, pending=true}
t6 Operations: {umount, pvname, pod2, pending=true}
t0 two pod do umount volume operation
t1 pod1 umount failed, update pending to false
t2 pod3 add, operation excuter add mount operation, will use index 0, override umount operation
t3 mount operation failed , update pending to false
t4 pod2 umount operation add, will find index 0, add override opertation, now ,we have two same umount operation, now we have two goroutines to do pod2 umount operations
t5 first goroutine pod2 umount failed, will update index 0 operation, update pending to false
t6 second goroutine pod2 umount success, will delete index 0 operation,
finally lead a umount opertion in cache, and this operation will lead all mount opeartion return AlreadyExistsError, and not cause any csi call.
and i think there exists some other scene can trigger this leak.
and also, when operation success, deleteOperation, will use tail element to replace, and may increase the probalility of occurrence. because isOperationExists will always pick first match one.
func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
// Assumes lock has been acquired by caller.
opIndex := -1
for i, op := range grm.operations {
if op.key.volumeName == key.volumeName &&
op.key.podName == key.podName &&
op.key.nodeName == key.nodeName {
opIndex = i
break
}
}
if opIndex < 0 {
return
}
// Delete index without preserving order
grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
grm.operations = grm.operations[:len(grm.operations)-1]
}
and also override mechanism may lead exponentialBackOff invalid, for example mount and umount take turns to excute.
What did you expect to happen?
pv can mount , pod can running
How can we reproduce it (as minimally and precisely as possible)?
reproduce step:
create same cronjob that use same pvc(use csi sfs)
and csi driver restart periodic
Anything else we need to know?
No response
Kubernetes version
$ kubectl version
# paste output here
Cloud provider
OS version
# On Linux:
$ cat /etc/os-release
# paste output here
$ uname -a
# paste output here
# On Windows:
C:\> wmic os get Caption, Version, BuildNumber, OSArchitecture
# paste output here