From 5dde1df1cd438b7c2749fc08ad1d6bd8b173d010 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Thu, 14 Feb 2019 17:04:23 -0800 Subject: [PATCH] Migration shim logic for Persistent Volumes for Attach/Detach --- pkg/controller/.import-restrictions | 14 ++- pkg/volume/csi/csi_attacher.go | 46 ++++++++-- pkg/volume/csi/csi_attacher_test.go | 4 +- pkg/volume/csi/csi_block.go | 6 +- pkg/volume/csi/csi_mounter.go | 2 +- pkg/volume/csi/csi_mounter_test.go | 2 +- pkg/volume/csi/csi_plugin.go | 5 +- pkg/volume/csi/csi_plugin_test.go | 4 +- pkg/volume/csi/csi_util.go | 6 +- pkg/volume/util/operationexecutor/BUILD | 1 + .../operationexecutor/operation_generator.go | 86 +++++++++++++++---- 11 files changed, 136 insertions(+), 40 deletions(-) diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 1adc2abdd6..9fa385d43a 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -101,7 +101,13 @@ "github.com/google/gofuzz", "github.com/prometheus/client_golang/prometheus", "github.com/robfig/cron", - "github.com/spf13/pflag" + "github.com/spf13/pflag", + "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/require", + "github.com/google/gofuzz", + "github.com/golang/protobuf/ptypes/wrappers", + "github.com/golang/protobuf/proto", + "github.com/container-storage-interface/spec/lib/go/csi" ] }, { @@ -257,6 +263,8 @@ "AllowedPrefixes": [ "k8s.io/csi-api/pkg/apis/csi/v1alpha1", "k8s.io/csi-api/pkg/client/clientset/versioned", + "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1", + "k8s.io/csi-api/pkg/client/informers/externalversions", "k8s.io/heapster/metrics/api/v1/types", "k8s.io/kube-controller-manager/config/v1alpha1", "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2", @@ -288,7 +296,9 @@ "google.golang.org/api/compute/v0.alpha", "google.golang.org/api/container/v1", "google.golang.org/api/compute/v0.beta", - "google.golang.org/api/tpu/v1" + "google.golang.org/api/tpu/v1", + "golang.org/x/net/context", + "google.golang.org/grpc" ] } ] diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index bd3cf1b5f1..68464eace8 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -385,20 +385,38 @@ var _ volume.Detacher = &csiAttacher{} var _ volume.DeviceUnmounter = &csiAttacher{} func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { - // volumeName in format driverNamevolumeHandle generated by plugin.GetVolumeName() + var attachID string + var volID string + if volumeName == "" { klog.Error(log("detacher.Detach missing value for parameter volumeName")) return errors.New("missing expected parameter volumeName") } - parts := strings.Split(volumeName, volNameSep) - if len(parts) != 2 { - klog.Error(log("detacher.Detach insufficient info encoded in volumeName")) - return errors.New("volumeName missing expected data") + + if isAttachmentName(volumeName) { + // Detach can also be called with the attach ID as the `volumeName`. This codepath is + // hit only when we have migrated an in-tree volume to CSI and the A/D Controller is shut + // down, the pod with the volume is deleted, and the A/D Controller starts back up in that + // order. + attachID = volumeName + + // Vol ID should be the volume handle, except that is not available here. + // It is only used in log messages so in the event that this happens log messages will be + // printing out the attachID instead of the volume handle. + volID = volumeName + } else { + // volumeName in format driverNamevolumeHandle generated by plugin.GetVolumeName() + parts := strings.Split(volumeName, volNameSep) + if len(parts) != 2 { + klog.Error(log("detacher.Detach insufficient info encoded in volumeName")) + return errors.New("volumeName missing expected data") + } + + driverName := parts[0] + volID = parts[1] + attachID = getAttachmentName(volID, driverName, string(nodeName)) } - driverName := parts[0] - volID := parts[1] - attachID := getAttachmentName(volID, driverName, string(nodeName)) if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil { if apierrs.IsNotFound(err) { // object deleted or never existed, done @@ -550,12 +568,22 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { return nil } -// getAttachmentName returns csi- +// getAttachmentName returns csi- func getAttachmentName(volName, csiDriverName, nodeName string) string { result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName))) return fmt.Sprintf("csi-%x", result) } +// isAttachmentName returns true if the string given is of the form of an Attach ID +// and false otherwise +func isAttachmentName(unknownString string) bool { + // 68 == "csi-" + len(sha256hash) + if strings.HasPrefix(unknownString, "csi-") && len(unknownString) == 68 { + return true + } + return false +} + func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) { if spec == nil { return "", fmt.Errorf("makeDeviceMountPath failed, spec is nil") diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 76b40a4011..6715a53f9c 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -1004,9 +1004,9 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin, plugMgr := &volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) - plug, err := plugMgr.FindPluginByName(csiPluginName) + plug, err := plugMgr.FindPluginByName(CSIPluginName) if err != nil { - t.Fatalf("can't find plugin %v", csiPluginName) + t.Fatalf("can't find plugin %v", CSIPluginName) } csiPlug, ok := plug.(*csiPlugin) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index fa52af425a..1e4553ff4d 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -63,20 +63,20 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) { // Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname} func (m *csiBlockMapper) getStagingPath() string { sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName) - return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID) + return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID) } // getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname} func (m *csiBlockMapper) getPublishPath() string { sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName) - return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID) + return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID) } // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume // returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname} func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) { - path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(csiPluginName)) + path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName)) specName := m.specName klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName)) return path, specName diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 53fcbadcd6..4e0dbe59df 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -80,7 +80,7 @@ func (c *csiMountMgr) GetPath() string { func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string { specVolID := utilstrings.EscapeQualifiedName(specVolumeID) - return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(csiPluginName), specVolID) + return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID) } // volume.Mounter methods diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 4a5d55f619..938e5b7876 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -197,7 +197,7 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { }, Spec: storage.VolumeAttachmentSpec{ NodeName: "test-node", - Attacher: csiPluginName, + Attacher: CSIPluginName, Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index e47715a034..da6b8c3f9e 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -46,7 +46,8 @@ import ( ) const ( - csiPluginName = "kubernetes.io/csi" + // CSIPluginName is the name of the in-tree CSI Plugin + CSIPluginName = "kubernetes.io/csi" // TODO (vladimirvivien) implement a more dynamic way to discover // the unix domain socket path for each installed csi driver. @@ -230,7 +231,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { } func (p *csiPlugin) GetPluginName() string { - return csiPluginName + return CSIPluginName } // GetvolumeName returns a concatenated string of CSIVolumeSource.DriverCSIVolumeSource.VolumeHandle diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 75e19fc9fb..48d77b10ef 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -61,9 +61,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs plugMgr := &volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) - plug, err := plugMgr.FindPluginByName(csiPluginName) + plug, err := plugMgr.FindPluginByName(CSIPluginName) if err != nil { - t.Fatalf("can't find plugin %v", csiPluginName) + t.Fatalf("can't find plugin %v", CSIPluginName) } csiPlug, ok := plug.(*csiPlugin) diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index b87cfe9cc4..5d343e4c59 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -109,7 +109,7 @@ func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) { // log prepends log string with `kubernetes.io/csi` func log(msg string, parts ...interface{}) string { - return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...) + return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...) } // getVolumeDevicePluginDir returns the path where the CSI plugin keeps the @@ -117,7 +117,7 @@ func log(msg string, parts ...interface{}) string { // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string { sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "dev") + return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev") } // getVolumeDeviceDataDir returns the path where the CSI plugin keeps the @@ -125,7 +125,7 @@ func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string { // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string { sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID) - return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data") + return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data") } // hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index a4a97f9db8..aac10c5a12 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/kubelet/events:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/csi:go_default_library", "//pkg/volume/util:go_default_library", "//pkg/volume/util/nestedpendingoperations:go_default_library", "//pkg/volume/util/types:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index fdbb6f7fa8..1043c93704 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -36,6 +36,7 @@ import ( kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/util" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" @@ -288,6 +289,9 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + var err error + var attachableVolumePlugin volume.AttachableVolumePlugin + // Get attacher plugin eventRecorderFunc := func(err *error) { if *err != nil { @@ -297,11 +301,29 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } } - attachableVolumePlugin, err := - og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) - if err != nil || attachableVolumePlugin == nil { - eventRecorderFunc(&err) - return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + originalSpec := volumeToAttach.VolumeSpec + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate + if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) { + // The volume represented by this spec is CSI and thus should be migrated + attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) + if err != nil || attachableVolumePlugin == nil { + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginByName failed", err) + } + + csiSpec, err := translateSpec(volumeToAttach.VolumeSpec) + if err != nil { + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err) + } + + volumeToAttach.VolumeSpec = csiSpec + } else { + attachableVolumePlugin, err = + og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) + if err != nil || attachableVolumePlugin == nil { + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + } } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() @@ -319,7 +341,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( if derr, ok := attachErr.(*util.DanglingAttachError); ok { addErr := actualStateOfWorld.MarkVolumeAsAttached( v1.UniqueVolumeName(""), - volumeToAttach.VolumeSpec, + originalSpec, derr.CurrentNode, derr.DevicePath) @@ -329,7 +351,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } else { addErr := actualStateOfWorld.MarkVolumeAsUncertain( - v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName) + v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName) if addErr != nil { klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr) } @@ -347,7 +369,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( // Update actual state of world addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( - v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) + v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName, devicePath) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) @@ -378,10 +400,26 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( if volumeToDetach.VolumeSpec != nil { // Get attacher plugin - attachableVolumePlugin, err = - og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) - if err != nil || attachableVolumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate + if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) { + // The volume represented by this spec is CSI and thus should be migrated + attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) + if err != nil || attachableVolumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + } + + csiSpec, err := translateSpec(volumeToDetach.VolumeSpec) + if err != nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err) + } + + volumeToDetach.VolumeSpec = csiSpec + } else { + attachableVolumePlugin, err = + og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) + if err != nil || attachableVolumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + } } volumeName, err = @@ -397,10 +435,28 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( if err != nil { return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) } - attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) - if err != nil || attachableVolumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err) + + // TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because + // if it was PV it may have been migrated, but the same plugin with in-line may not have been. + // Suggestions welcome... + if csilib.IsMigratableByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { + // The volume represented by this spec is CSI and thus should be migrated + attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) + if err != nil || attachableVolumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + } + // volumeToDetach.VolumeName here is always the in-tree volume name + // therefore a workaround is required. volumeToDetach.DevicePath + // is the attachID which happens to be what volumeName is needed for in Detach. + // Therefore we set volumeName to the attachID. And CSI Detach can detect and use that. + volumeName = volumeToDetach.DevicePath + } else { + attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) + if err != nil || attachableVolumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err) + } } + } if pluginName == "" {