Migration shim logic for Persistent Volumes for Attach/Detach

pull/564/head
David Zhu 2019-02-14 17:04:23 -08:00
parent f968499812
commit 5dde1df1cd
11 changed files with 136 additions and 40 deletions

View File

@ -101,7 +101,13 @@
"github.com/google/gofuzz",
"github.com/prometheus/client_golang/prometheus",
"github.com/robfig/cron",
"github.com/spf13/pflag"
"github.com/spf13/pflag",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
"github.com/google/gofuzz",
"github.com/golang/protobuf/ptypes/wrappers",
"github.com/golang/protobuf/proto",
"github.com/container-storage-interface/spec/lib/go/csi"
]
},
{
@ -257,6 +263,8 @@
"AllowedPrefixes": [
"k8s.io/csi-api/pkg/apis/csi/v1alpha1",
"k8s.io/csi-api/pkg/client/clientset/versioned",
"k8s.io/csi-api/pkg/client/listers/csi/v1alpha1",
"k8s.io/csi-api/pkg/client/informers/externalversions",
"k8s.io/heapster/metrics/api/v1/types",
"k8s.io/kube-controller-manager/config/v1alpha1",
"k8s.io/metrics/pkg/apis/custom_metrics/v1beta2",
@ -288,7 +296,9 @@
"google.golang.org/api/compute/v0.alpha",
"google.golang.org/api/container/v1",
"google.golang.org/api/compute/v0.beta",
"google.golang.org/api/tpu/v1"
"google.golang.org/api/tpu/v1",
"golang.org/x/net/context",
"google.golang.org/grpc"
]
}
]

View File

@ -385,20 +385,38 @@ var _ volume.Detacher = &csiAttacher{}
var _ volume.DeviceUnmounter = &csiAttacher{}
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
var attachID string
var volID string
if volumeName == "" {
klog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing expected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
if isAttachmentName(volumeName) {
// Detach can also be called with the attach ID as the `volumeName`. This codepath is
// hit only when we have migrated an in-tree volume to CSI and the A/D Controller is shut
// down, the pod with the volume is deleted, and the A/D Controller starts back up in that
// order.
attachID = volumeName
// Vol ID should be the volume handle, except that is not available here.
// It is only used in log messages so in the event that this happens log messages will be
// printing out the attachID instead of the volume handle.
volID = volumeName
} else {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}
driverName := parts[0]
volID = parts[1]
attachID = getAttachmentName(volID, driverName, string(nodeName))
}
driverName := parts[0]
volID := parts[1]
attachID := getAttachmentName(volID, driverName, string(nodeName))
if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {
if apierrs.IsNotFound(err) {
// object deleted or never existed, done
@ -550,12 +568,22 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
return nil
}
// getAttachmentName returns csi-<sha252(volName,csiDriverName,NodeName>
// getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
func getAttachmentName(volName, csiDriverName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
return fmt.Sprintf("csi-%x", result)
}
// isAttachmentName returns true if the string given is of the form of an Attach ID
// and false otherwise
func isAttachmentName(unknownString string) bool {
// 68 == "csi-" + len(sha256hash)
if strings.HasPrefix(unknownString, "csi-") && len(unknownString) == 68 {
return true
}
return false
}
func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
if spec == nil {
return "", fmt.Errorf("makeDeviceMountPath failed, spec is nil")

View File

@ -1004,9 +1004,9 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin,
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csiPluginName)
plug, err := plugMgr.FindPluginByName(CSIPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
t.Fatalf("can't find plugin %v", CSIPluginName)
}
csiPlug, ok := plug.(*csiPlugin)

View File

@ -63,20 +63,20 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
func (m *csiBlockMapper) getStagingPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID)
}
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
func (m *csiBlockMapper) getPublishPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID)
}
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(csiPluginName))
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
specName := m.specName
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
return path, specName

View File

@ -80,7 +80,7 @@ func (c *csiMountMgr) GetPath() string {
func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(csiPluginName), specVolID)
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
}
// volume.Mounter methods

View File

@ -197,7 +197,7 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Attacher: CSIPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},

View File

@ -46,7 +46,8 @@ import (
)
const (
csiPluginName = "kubernetes.io/csi"
// CSIPluginName is the name of the in-tree CSI Plugin
CSIPluginName = "kubernetes.io/csi"
// TODO (vladimirvivien) implement a more dynamic way to discover
// the unix domain socket path for each installed csi driver.
@ -230,7 +231,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
}
func (p *csiPlugin) GetPluginName() string {
return csiPluginName
return CSIPluginName
}
// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle

View File

@ -61,9 +61,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csiPluginName)
plug, err := plugMgr.FindPluginByName(CSIPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
t.Fatalf("can't find plugin %v", CSIPluginName)
}
csiPlug, ok := plug.(*csiPlugin)

View File

@ -109,7 +109,7 @@ func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {
// log prepends log string with `kubernetes.io/csi`
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...)
}
// getVolumeDevicePluginDir returns the path where the CSI plugin keeps the
@ -117,7 +117,7 @@ func log(msg string, parts ...interface{}) string {
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev
func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "dev")
return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev")
}
// getVolumeDeviceDataDir returns the path where the CSI plugin keeps the
@ -125,7 +125,7 @@ func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data
func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data")
return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data")
}
// hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/kubelet/events:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/nestedpendingoperations:go_default_library",
"//pkg/volume/util/types:go_default_library",

View File

@ -36,6 +36,7 @@ import (
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
@ -288,6 +289,9 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
var err error
var attachableVolumePlugin volume.AttachableVolumePlugin
// Get attacher plugin
eventRecorderFunc := func(err *error) {
if *err != nil {
@ -297,11 +301,29 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
}
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
originalSpec := volumeToAttach.VolumeSpec
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginByName failed", err)
}
csiSpec, err := translateSpec(volumeToAttach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
}
volumeToAttach.VolumeSpec = csiSpec
} else {
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
@ -319,7 +341,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
if derr, ok := attachErr.(*util.DanglingAttachError); ok {
addErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""),
volumeToAttach.VolumeSpec,
originalSpec,
derr.CurrentNode,
derr.DevicePath)
@ -329,7 +351,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
} else {
addErr := actualStateOfWorld.MarkVolumeAsUncertain(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName)
if addErr != nil {
klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr)
}
@ -347,7 +369,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName, devicePath)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
@ -378,10 +400,26 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
}
volumeToDetach.VolumeSpec = csiSpec
} else {
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}
}
volumeName, err =
@ -397,10 +435,28 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
}
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
// TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because
// if it was PV it may have been migrated, but the same plugin with in-line may not have been.
// Suggestions welcome...
if csilib.IsMigratableByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
// volumeToDetach.VolumeName here is always the in-tree volume name
// therefore a workaround is required. volumeToDetach.DevicePath
// is the attachID which happens to be what volumeName is needed for in Detach.
// Therefore we set volumeName to the attachID. And CSI Detach can detect and use that.
volumeName = volumeToDetach.DevicePath
} else {
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
}
}
}
if pluginName == "" {