Add volume operation metrics to operation executor and PV controller

pull/6/head
Matthew Wong 2017-08-02 14:39:48 -04:00
parent bc1a58ae3a
commit 3ed34183d0
9 changed files with 192 additions and 94 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"github.com/golang/glog"
)
@ -1216,7 +1217,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
return false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)
}
if err = deleter.Delete(); err != nil {
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete")
err = deleter.Delete()
opComplete(err)
if err != nil {
// Deleter failed
return false, err
}
@ -1326,7 +1330,9 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
return
}
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision()
opComplete(err)
if err != nil {
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)

View File

@ -15,6 +15,7 @@ go_library(
"doc.go",
"fs_unsupported.go",
"io_util.go",
"metrics.go",
"util.go",
] + select({
"@io_bazel_rules_go//go/platform:darwin_amd64": [
@ -31,6 +32,7 @@ go_library(
"//pkg/api/v1/helper:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
var storageOperationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "storage_operation_duration_seconds",
Help: "Storage operation duration",
},
[]string{"volume_plugin", "operation_name"},
)
var storageOperationErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "storage_operation_errors_total",
Help: "Storage operation errors",
},
[]string{"volume_plugin", "operation_name"},
)
func init() {
registerMetrics()
}
func registerMetrics() {
prometheus.MustRegister(storageOperationMetric)
prometheus.MustRegister(storageOperationErrorMetric)
}
// OperationCompleteHook returns a hook to call when an operation is completed
func OperationCompleteHook(plugin, operationName string) func(error) {
requestTime := time.Now()
opComplete := func(err error) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
if err != nil {
storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc()
} else {
storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken)
}
}
return opComplete
}

View File

@ -55,7 +55,7 @@ type NestedPendingOperations interface {
// concatenation of volumeName and podName is removed from the list of
// executing operations allowing a new operation to be started with the
// volumeName without error.
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error
// Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish
@ -94,7 +94,8 @@ type operation struct {
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName types.UniquePodName,
operationFunc func() error) error {
operationFunc func() error,
operationCompleteFunc func(error)) error {
grm.lock.Lock()
defer grm.lock.Unlock()
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
@ -132,6 +133,7 @@ func (grm *nestedPendingOperations) Run(
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &err)
defer operationCompleteFunc(err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()

View File

@ -50,7 +50,7 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
operation := func() error { return nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})
// Assert
if err != nil {
@ -66,8 +66,8 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
operation := func() error { return nil }
// Act
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation)
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation)
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {})
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {})
// Assert
if err1 != nil {
@ -88,8 +88,8 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
operation := func() error { return nil }
// Act
err1 := grm.Run(volumeName, operation1PodName, operation)
err2 := grm.Run(volumeName, operation2PodName, operation)
err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {})
err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {})
// Assert
if err1 != nil {
@ -108,7 +108,7 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
operation := func() error { return nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})
// Assert
if err != nil {
@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
// Assert
if err2 == nil {
@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})
// Assert
if err2 == nil {
@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T)
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})
// Assert
if err2 == nil {
@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
// Assert
if err2 == nil {
@ -344,7 +344,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -352,7 +352,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
// Assert
if err2 == nil {
@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -388,7 +388,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -396,7 +396,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
// Assert
if err2 == nil {
@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
@ -535,29 +536,32 @@ func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName,
func (oe *operationExecutor) AttachVolume(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
attachFunc, err :=
attachFunc, plugin, err :=
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_attach")
return oe.pendingOperations.Run(
volumeToAttach.VolumeName, "" /* podName */, attachFunc)
volumeToAttach.VolumeName, "" /* podName */, attachFunc, opCompleteFunc)
}
func (oe *operationExecutor) DetachVolume(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
detachFunc, err :=
detachFunc, plugin, err :=
oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_detach")
return oe.pendingOperations.Run(
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
volumeToDetach.VolumeName, "" /* podName */, detachFunc, opCompleteFunc)
}
func (oe *operationExecutor) VerifyVolumesAreAttached(
attachedVolumes map[types.NodeName][]AttachedVolume,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
@ -630,9 +634,11 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
if err != nil {
glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
}
opCompleteFunc := util.OperationCompleteHook(pluginName, "verify_volumes_are_attached")
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
uniquePluginName := v1.UniqueVolumeName(pluginName)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc, opCompleteFunc)
if err != nil {
glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
}
@ -648,8 +654,10 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook("<n/a>", "verify_volumes_are_attached_per_node")
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc)
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc, opCompleteFunc)
}
func (oe *operationExecutor) MountVolume(
@ -657,7 +665,7 @@ func (oe *operationExecutor) MountVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) error {
mountFunc, err := oe.operationGenerator.GenerateMountVolumeFunc(
mountFunc, plugin, err := oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
if err != nil {
return err
@ -671,15 +679,17 @@ func (oe *operationExecutor) MountVolume(
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
}
// TODO mount_device
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_mount")
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, mountFunc)
volumeToMount.VolumeName, podName, mountFunc, opCompleteFunc)
}
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
unmountFunc, err :=
unmountFunc, plugin, err :=
oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
if err != nil {
return err
@ -689,36 +699,39 @@ func (oe *operationExecutor) UnmountVolume(
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_unmount")
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, unmountFunc)
volumeToUnmount.VolumeName, podName, unmountFunc, opCompleteFunc)
}
func (oe *operationExecutor) UnmountDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
unmountDeviceFunc, err :=
unmountDeviceFunc, plugin, err :=
oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "unmount_device")
return oe.pendingOperations.Run(
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc)
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount VolumeToMount,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
verifyControllerAttachedVolumeFunc, err :=
verifyControllerAttachedVolumeFunc, plugin, err :=
oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "verify_controller_attached_volume")
return oe.pendingOperations.Run(
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc)
}
// TODO: this is a workaround for the unmount device issue caused by gci mounter.

View File

@ -239,29 +239,29 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
}
}
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
@ -269,17 +269,17 @@ func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolum
return nil
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(

View File

@ -73,25 +73,25 @@ func NewOperationGenerator(kubeClient clientset.Interface,
// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
type OperationGenerator interface {
// Generates the MountVolume function needed to perform the mount of a volume plugin
GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, error)
GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error)
// Generates the UnmountVolume function needed to perform the unmount of a volume plugin
GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error)
GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error)
// Generates the AttachVolume function needed to perform attach of a volume plugin
GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error)
GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error)
// Generates the DetachVolume function needed to perform the detach of a volume plugin
GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error)
GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error)
// Generates the VolumesAreAttached function needed to verify if volume plugins are attached
GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error)
// Generates the UnMountDevice function needed to perform the unmount of a device
GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error)
GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error)
// Generates the function needed to check if the attach_detach controller has attached the volume plugin
GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error)
GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error)
// GetVolumePluginMgr returns volume plugin manager
GetVolumePluginMgr() *volume.VolumePluginMgr
@ -245,17 +245,17 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return nil, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
return nil, "", volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
if newAttacherErr != nil {
return nil, volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr)
return nil, attachableVolumePlugin.GetPluginName(), volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr)
}
return func() error {
@ -283,7 +283,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
return nil
}, nil
}, attachableVolumePlugin.GetPluginName(), nil
}
func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
@ -293,9 +293,10 @@ func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
func (og *operationGenerator) GenerateDetachVolumeFunc(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
var volumeName string
var attachableVolumePlugin volume.AttachableVolumePlugin
var pluginName string
var err error
if volumeToDetach.VolumeSpec != nil {
@ -303,31 +304,35 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
return nil, "", volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}
volumeName, err =
attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec)
if err != nil {
return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err)
return nil, attachableVolumePlugin.GetPluginName(), volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err)
}
} else {
var pluginName string
// Get attacher plugin and the volumeName by splitting the volume unique name in case
// there's no VolumeSpec: this happens only on attach/detach controller crash recovery
// when a pod has been deleted during the controller downtime
pluginName, volumeName, err = volumehelper.SplitUniqueName(volumeToDetach.VolumeName)
if err != nil {
return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
}
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil {
return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}
}
if pluginName == "" {
pluginName = attachableVolumePlugin.GetPluginName()
}
volumeDetacher, err := attachableVolumePlugin.NewDetacher()
if err != nil {
return nil, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
}
return func() error {
@ -352,24 +357,24 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
volumeToDetach.VolumeName, volumeToDetach.NodeName)
return nil
}, nil
}, pluginName, nil
}
func (og *operationGenerator) GenerateMountVolumeFunc(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) (func() error, error) {
isRemount bool) (func() error, string, error) {
// Get mounter plugin
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return nil, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err)
return nil, "", volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err)
}
affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin)
if affinityErr != nil {
return nil, affinityErr
return nil, volumePlugin.GetPluginName(), affinityErr
}
volumeMounter, newMounterErr := volumePlugin.NewMounter(
@ -379,13 +384,13 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
if newMounterErr != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
return nil, detailedErr
return nil, volumePlugin.GetPluginName(), detailedErr
}
mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
if mountCheckError != nil {
return nil, mountCheckError
return nil, volumePlugin.GetPluginName(), mountCheckError
}
// Get attacher, if possible
@ -489,23 +494,23 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
return nil
}, nil
}, volumePlugin.GetPluginName(), nil
}
func (og *operationGenerator) GenerateUnmountVolumeFunc(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) {
// Get mountable plugin
volumePlugin, err :=
og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
if err != nil || volumePlugin == nil {
return nil, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err)
return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err)
}
volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
if newUnmounterErr != nil {
return nil, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
return nil, volumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
}
return func() error {
@ -535,28 +540,28 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
}
return nil
}, nil
}, volumePlugin.GetPluginName(), nil
}
func (og *operationGenerator) GenerateUnmountDeviceFunc(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) (func() error, error) {
mounter mount.Interface) (func() error, string, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err)
return nil, "", deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err)
}
volumeDetacher, err := attachableVolumePlugin.NewDetacher()
if err != nil {
return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err)
return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err)
}
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
if err != nil {
return nil, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err)
return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err)
}
return func() error {
@ -616,13 +621,19 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
}
return nil
}, nil
}, attachableVolumePlugin.GetPluginName(), nil
}
func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
volumeToMount VolumeToMount,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return nil, "", volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
}
return func() error {
if !volumeToMount.PluginIsAttachable {
// If the volume does not implement the attacher interface, it is
@ -678,7 +689,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
// Volume not attached, return error. Caller will log and retry.
return volumeToMount.GenerateErrorDetailed("Volume not attached according to node status", nil)
}, nil
}, volumePlugin.GetPluginName(), nil
}
func (og *operationGenerator) verifyVolumeIsSafeToDetach(