Skip to content

Commit

Permalink
Allow mounts to run in parallel for non-attachable
Browse files Browse the repository at this point in the history
Allow mount volume operations to run in parallel for non-attachable
volume plugins.

Allow unmount volume operations to run in parallel for all volume
plugins.
  • Loading branch information
saad-ali committed Jul 20, 2016
1 parent 5df9284 commit 88d4950
Show file tree
Hide file tree
Showing 10 changed files with 1,131 additions and 156 deletions.
21 changes: 11 additions & 10 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)

Expand Down Expand Up @@ -114,9 +115,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
Expand All @@ -134,9 +135,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v",
Expand Down Expand Up @@ -169,9 +170,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
Expand Down
39 changes: 20 additions & 19 deletions pkg/kubelet/volumemanager/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)

Expand Down Expand Up @@ -122,9 +123,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down Expand Up @@ -163,9 +164,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
rc.hostName,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down Expand Up @@ -198,9 +199,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down Expand Up @@ -236,9 +237,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down Expand Up @@ -271,9 +272,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down Expand Up @@ -302,9 +303,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
Expand Down
120 changes: 120 additions & 0 deletions pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package exponentialbackoff contains logic for implementing exponential
// backoff for GoRoutineMap and NestedPendingOperations.
package exponentialbackoff

import (
"fmt"
"time"
)

const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoroutineMap will refuse to allow another operation to start with
// the same target (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond

// maxDurationBeforeRetry is the maximum amount of time that
// durationBeforeRetry will grow to due to exponential backoff.
maxDurationBeforeRetry time.Duration = 2 * time.Minute
)

// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
}

// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
return NewExponentialBackoffError(operationName, *expBackoff)
}

return nil
}

func (expBackoff *ExponentialBackoff) Update(err *error) {
if expBackoff.durationBeforeRetry == 0 {
expBackoff.durationBeforeRetry = initialDurationBeforeRetry
} else {
expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
expBackoff.durationBeforeRetry = maxDurationBeforeRetry
}
}

expBackoff.lastError = *err
expBackoff.lastErrorTime = time.Now()
}

func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg(
operationName string) string {
return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %v",
operationName,
expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry),
expBackoff.durationBeforeRetry,
expBackoff.lastError)
}

// NewExponentialBackoffError returns a new instance of ExponentialBackoff error.
func NewExponentialBackoffError(
operationName string, expBackoff ExponentialBackoff) error {
return exponentialBackoffError{
operationName: operationName,
expBackoff: expBackoff,
}
}

// IsExponentialBackoff returns true if an error returned from GoroutineMap
// indicates that a new operation can not be started because
// exponentialBackOffOnError is enabled and a previous operation with the same
// operation failed within the durationBeforeRetry period.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
}

// exponentialBackoffError is the error returned returned from GoroutineMap when
// a new operation can not be started because exponentialBackOffOnError is
// enabled and a previous operation with the same operation failed within the
// durationBeforeRetry period.
type exponentialBackoffError struct {
operationName string
expBackoff ExponentialBackoff
}

var _ error = exponentialBackoffError{}

func (err exponentialBackoffError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.expBackoff.lastErrorTime,
err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry),
err.expBackoff.durationBeforeRetry,
err.expBackoff.lastError)
}
Loading

0 comments on commit 88d4950

Please sign in to comment.