Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add volume operation metrics to operation executor and PV controller #50036

Merged
merged 1 commit into from
Aug 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/volume/persistentvolume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/volume/persistentvolume/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -1216,7 +1217,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
return false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)
}

if err = deleter.Delete(); err != nil {
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete")
err = deleter.Delete()
opComplete(err)
if err != nil {
// Deleter failed
return false, err
}
Expand Down Expand Up @@ -1326,7 +1330,9 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
return
}

opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane was that an oversight that provisionClaimOperation function does not return error, even if provisioning fails for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious too, but regardless of answer we decided to wrap Provision since wrapping provisionClaimOperation would include irrelevant API server delays

Copy link
Member

@gnufied gnufied Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think - we can revisit this later, but I do not think it is a problem if API server delays are included in the metric. Similar metrics too include some API server delays as well. My thinking is - it would have been cleaner to do this in scheduleOperation because then we don't need to modify each operation individually.

volume, err = provisioner.Provision()
opComplete(err)
if err != nil {
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/volume/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"doc.go",
"fs_unsupported.go",
"io_util.go",
"metrics.go",
"util.go",
] + select({
"@io_bazel_rules_go//go/platform:darwin_amd64": [
Expand All @@ -31,6 +32,7 @@ go_library(
"//pkg/api/v1/helper:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
Expand Down
63 changes: 63 additions & 0 deletions pkg/volume/util/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

var storageOperationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "storage_operation_duration_seconds",
Help: "Storage operation duration",
},
[]string{"volume_plugin", "operation_name"},
)

var storageOperationErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "storage_operation_errors_total",
Help: "Storage operation errors",
},
[]string{"volume_plugin", "operation_name"},
)

func init() {
registerMetrics()
}

func registerMetrics() {
prometheus.MustRegister(storageOperationMetric)
prometheus.MustRegister(storageOperationErrorMetric)
}

// OperationCompleteHook returns a hook to call when an operation is completed
func OperationCompleteHook(plugin, operationName string) func(error) {
requestTime := time.Now()
opComplete := func(err error) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
if err != nil {
storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc()
} else {
storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken)
}
}
return opComplete
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type NestedPendingOperations interface {
// concatenation of volumeName and podName is removed from the list of
// executing operations allowing a new operation to be started with the
// volumeName without error.
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error

// Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish
Expand Down Expand Up @@ -94,7 +94,8 @@ type operation struct {
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName types.UniquePodName,
operationFunc func() error) error {
operationFunc func() error,
operationCompleteFunc func(error)) error {
grm.lock.Lock()
defer grm.lock.Unlock()
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
Expand Down Expand Up @@ -132,6 +133,7 @@ func (grm *nestedPendingOperations) Run(
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &err)
defer operationCompleteFunc(err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
operation := func() error { return nil }

// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})

// Assert
if err != nil {
Expand All @@ -66,8 +66,8 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
operation := func() error { return nil }

// Act
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation)
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation)
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {})
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {})

// Assert
if err1 != nil {
Expand All @@ -88,8 +88,8 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
operation := func() error { return nil }

// Act
err1 := grm.Run(volumeName, operation1PodName, operation)
err2 := grm.Run(volumeName, operation2PodName, operation)
err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {})
err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {})

// Assert
if err1 != nil {
Expand All @@ -108,7 +108,7 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
operation := func() error { return nil }

// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})

// Assert
if err != nil {
Expand All @@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
Expand All @@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand All @@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
Expand All @@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand All @@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
Expand All @@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand All @@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
Expand All @@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand All @@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T)
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -344,15 +344,15 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
operation3 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand All @@ -388,15 +388,15 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
operation3 := generateNoopFunc()

// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})

// Assert
if err2 == nil {
Expand All @@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
Expand Down Expand Up @@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
Expand Down
Loading