From 7d2f4e97b8f9d86f6c435191e4a9cb53ba9c495d Mon Sep 17 00:00:00 2001 From: David Zhu Date: Tue, 5 Mar 2019 10:57:18 -0800 Subject: [PATCH] Add ADC Fallback if Node doesn't have driver installed --- cmd/kube-controller-manager/app/core.go | 1 + pkg/apis/core/annotation_key_constants.go | 3 + pkg/controller/volume/attachdetach/BUILD | 2 + .../attachdetach/attach_detach_controller.go | 43 +++++-- .../attach_detach_controller_test.go | 2 + .../volume/persistentvolume/pv_controller.go | 5 +- pkg/volume/BUILD | 1 + .../csi/nodeinfomanager/nodeinfomanager.go | 30 +++-- .../nodeinfomanager/nodeinfomanager_test.go | 16 +-- pkg/volume/plugins.go | 20 ++++ pkg/volume/util/operationexecutor/BUILD | 2 + .../operationexecutor/operation_generator.go | 109 +++++++++++++++++- .../rbac/bootstrappolicy/controller_policy.go | 3 + .../testdata/controller-roles.yaml | 8 ++ .../api/core/v1/annotation_key_constants.go | 3 + .../k8s.io/csi-translation-lib/translate.go | 20 +++- test/integration/volume/attach_detach_test.go | 1 + 17 files changed, 225 insertions(+), 44 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 9ee77f5367..7d9d3bfdfd 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -223,6 +223,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.InformerFactory.Storage().V1beta1().CSINodes(), ctx.Cloud, ProbeAttachableVolumePlugins(), GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index bef73c0db0..85b7a59042 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -101,4 +101,7 @@ const ( // This annotation will be used to compute the in-cluster network programming latency SLI, see // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + + // TODO(dyzz) Comment + MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index d513330fef..ec124c66ec 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -32,10 +32,12 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index c67dd88f25..27992b6e5e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -31,10 +31,12 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -101,6 +103,7 @@ func NewAttachDetachController( nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, + csiNodeInformer storageinformers.CSINodeInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, prober volume.DynamicPluginProber, @@ -122,18 +125,20 @@ func NewAttachDetachController( // dropped pods so they are continuously processed until it is accepted or // deleted (probably can't do this with sharedInformer), etc. adc := &attachDetachController{ - kubeClient: kubeClient, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvsSynced: pvInformer.Informer().HasSynced, - podLister: podInformer.Lister(), - podsSynced: podInformer.Informer().HasSynced, - podIndexer: podInformer.Informer().GetIndexer(), - nodeLister: nodeInformer.Lister(), - nodesSynced: nodeInformer.Informer().HasSynced, - cloud: cloud, - pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), + kubeClient: kubeClient, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvsSynced: pvInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podsSynced: podInformer.Informer().HasSynced, + podIndexer: podInformer.Informer().GetIndexer(), + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + csiNodeLister: csiNodeInformer.Lister(), + csiNodeSynced: csiNodeInformer.Informer().HasSynced, + cloud: cloud, + pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), } if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { @@ -257,6 +262,9 @@ type attachDetachController struct { nodeLister corelisters.NodeLister nodesSynced kcache.InformerSynced + csiNodeLister storagelisters.CSINodeLister + csiNodeSynced kcache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -643,6 +651,17 @@ func (adc *attachDetachController) processVolumesInUse( } } +var _ volume.VolumeHost = &attachDetachController{} +var _ volume.AttachDetachVolumeHost = &attachDetachController{} + +func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister { + return adc.csiNodeLister +} + +func (adc *attachDetachController) CSINodeSynced() bool { + return adc.csiNodeSynced() +} + // VolumeHost implementation // This is an unfortunate requirement of the current factoring of volume plugin // initializing code. It requires kubelet specific methods used by the mounting diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 2c5c2bc06b..028d868db4 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -44,6 +44,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1beta1().CSINodes(), nil, /* cloud */ nil, /* plugins */ nil, /* prober */ @@ -218,6 +219,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1beta1().CSINodes(), nil, /* cloud */ plugins, prober, diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index e8f0e75f19..2db7d8b710 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -1376,7 +1376,8 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis var pluginName string provisionerName := storageClass.Provisioner if plugin != nil { - if plugin.IsMigratedToCSI() { + // TODO(dyzz) Just temporary to test without dynamic provisioning + if plugin.IsMigratedToCSI() && false { // pluginName is not set here to align with existing behavior // of not setting pluginName for external provisioners (including CSI) // Set provisionerName to CSI plugin name for setClaimProvisioner @@ -1401,7 +1402,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis } claim = newClaim - if plugin == nil || plugin.IsMigratedToCSI() { + if plugin == nil { // findProvisionablePlugin returned no error nor plugin. // This means that an unknown provisioner is requested. Report an event // and wait for the external provisioner diff --git a/pkg/volume/BUILD b/pkg/volume/BUILD index 7d5c5d4658..dc4a16cbd6 100644 --- a/pkg/volume/BUILD +++ b/pkg/volume/BUILD @@ -30,6 +30,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index 99a610d5cb..be84560f9f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -26,7 +26,7 @@ import ( "time" "k8s.io/api/core/v1" - storage "k8s.io/api/storage/v1beta1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -71,7 +71,7 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error) // Interface implements an interface for managing labels of a node type Interface interface { - CreateCSINode() (*storage.CSINode, error) + CreateCSINode() (*storagev1beta1.CSINode, error) // Record in the cluster the given node information from the CSI driver with the given name. // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls @@ -388,7 +388,7 @@ func (nim *nodeInfoManager) tryUpdateCSINode( return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology) } -func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { +func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) { kubeClient := nim.volumeHost.GetKubeClient() if kubeClient == nil { @@ -405,7 +405,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { return nil, err } - nodeInfo := &storage.CSINode{ + nodeInfo := &storagev1beta1.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: string(nim.nodeName), OwnerReferences: []metav1.OwnerReference{ @@ -417,8 +417,8 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { }, }, }, - Spec: storage.CSINodeSpec{ - Drivers: []storage.CSINodeDriver{}, + Spec: storagev1beta1.CSINodeSpec{ + Drivers: []storagev1beta1.CSINodeDriver{}, }, } @@ -427,11 +427,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo) } -const ( - migratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" -) - -func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storage.CSINode) (modified bool) { +func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1beta1.CSINode) (modified bool) { if migratedPlugins == nil { return false } @@ -441,7 +437,7 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo * nodeInfoAnnotations = map[string]string{} } - mpa := nodeInfoAnnotations[migratedPluginsAnnotationKey] + mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] tok := strings.Split(mpa, ",") oldAnnotationSet := sets.NewString(tok...) @@ -458,9 +454,9 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo * nas := strings.Join(newAnnotationSet.List(), ",") if len(nas) != 0 { - nodeInfoAnnotations[migratedPluginsAnnotationKey] = nas + nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas } else { - delete(nodeInfoAnnotations, migratedPluginsAnnotationKey) + delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey) } nodeInfo.Annotations = nodeInfoAnnotations @@ -468,7 +464,7 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo * } func (nim *nodeInfoManager) installDriverToCSINode( - nodeInfo *storage.CSINode, + nodeInfo *storagev1beta1.CSINode, driverName string, driverNodeID string, topology map[string]string) error { @@ -486,7 +482,7 @@ func (nim *nodeInfoManager) installDriverToCSINode( specModified := true statusModified := true // Clone driver list, omitting the driver that matches the given driverName - newDriverSpecs := []storage.CSINodeDriver{} + newDriverSpecs := []storagev1beta1.CSINodeDriver{} for _, driverInfoSpec := range nodeInfo.Spec.Drivers { if driverInfoSpec.Name == driverName { if driverInfoSpec.NodeID == driverNodeID && @@ -506,7 +502,7 @@ func (nim *nodeInfoManager) installDriverToCSINode( } // Append new driver - driverSpec := storage.CSINodeDriver{ + driverSpec := storagev1beta1.CSINodeDriver{ Name: driverName, NodeID: driverNodeID, TopologyKeys: topologyKeys.List(), diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index 7146a391e8..71946f8edb 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -580,7 +580,7 @@ func TestSetMigrationAnnotation(t *testing.T) { expectedNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, }, }, expectModified: true, @@ -593,13 +593,13 @@ func TestSetMigrationAnnotation(t *testing.T) { existingNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, }, }, expectedNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, }, }, expectModified: false, @@ -610,7 +610,7 @@ func TestSetMigrationAnnotation(t *testing.T) { existingNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, }, }, expectedNode: &storage.CSINode{ @@ -635,7 +635,7 @@ func TestSetMigrationAnnotation(t *testing.T) { expectedNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"}, }, }, expectModified: true, @@ -649,13 +649,13 @@ func TestSetMigrationAnnotation(t *testing.T) { existingNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{"other": "annotation", migratedPluginsAnnotationKey: "foo"}, + Annotations: map[string]string{"other": "annotation", v1.MigratedPluginsAnnotationKey: "foo"}, }, }, expectedNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"}, }, }, expectModified: true, @@ -675,7 +675,7 @@ func TestSetMigrationAnnotation(t *testing.T) { expectedNode: &storage.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", - Annotations: map[string]string{migratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, }, }, expectModified: true, diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 1470156dfb..85f5223e0a 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -30,6 +30,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation" clientset "k8s.io/client-go/kubernetes" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" @@ -278,6 +279,14 @@ type BlockVolumePlugin interface { ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error) } +// TODO(#14217) +// As part of the Volume Host refactor we are starting to create Volume Hosts +// for specific hosts. New methods for each specific host can be added here. +// Currently consumers will do type assertions to get the specific type of Volume +// Host; however, the end result should be that specific Volume Hosts are passed +// to the specific functions they are needed in (instead of using a catch-all +// VolumeHost interface) + // KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet. type KubeletVolumeHost interface { // SetKubeletError lets plugins set an error on the Kubelet runtime status @@ -285,6 +294,17 @@ type KubeletVolumeHost interface { SetKubeletError(err error) } +// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use +// to access methods on the Attach Detach Controller. +type AttachDetachVolumeHost interface { + // CSINodeLister returns the informer lister for the CSINode API Object + CSINodeLister() storagelisters.CSINodeLister + + // CSINodeSynced returns a boolean representing whether the CSINode API Object + // informer has been synced + CSINodeSynced() bool +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index d87b21c447..f7e40f612e 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -25,9 +25,11 @@ go_library( "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index c35a060dae..e49d10ccb0 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -23,9 +23,11 @@ import ( "time" "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -84,6 +86,7 @@ func NewOperationGenerator(kubeClient clientset.Interface, checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, blkUtil: blkUtil, } + // TODO(dyzz) look at default resync time } // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable @@ -303,8 +306,14 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } originalSpec := volumeToAttach.VolumeSpec + nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName) + if err != nil { + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NodeUsingCSIPlugin failed", err) + } + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate - if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) { + if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu { // The volume represented by this spec is CSI and thus should be migrated attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) if err != nil || attachableVolumePlugin == nil { @@ -401,17 +410,22 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( if volumeToDetach.VolumeSpec != nil { // Get attacher plugin + nu, err := nodeUsingCSIPlugin(og, volumeToDetach.VolumeSpec, volumeToDetach.NodeName) + if err != nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NodeUsingCSIPlugin failed", err) + } + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate - if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) { + if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) && nu { // The volume represented by this spec is CSI and thus should be migrated attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) if err != nil || attachableVolumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } csiSpec, err := translateSpec(volumeToDetach.VolumeSpec) if err != nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err) } volumeToDetach.VolumeSpec = csiSpec @@ -1547,6 +1561,93 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { return false } +func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { + var err error + + migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) + if err != nil { + return false, err + } + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) || + !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) || + !migratable { + return false, nil + } + + if len(nodeName) == 0 { + return false, fmt.Errorf("nodeName is empty") + } + + vpm := og.volumePluginMgr + + kubeClient := vpm.Host.GetKubeClient() + if kubeClient == nil { + // TODO(dyzz) check this error case, what should we do in standalone kubelet mode? + return false, fmt.Errorf("failed to get kube client from volume host") + } + + adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) + if !ok { + // This function is running not on the AttachDetachController + // We assume that Kubelet is servicing this function and therefore is + // trivially "using CSI Plugin" + return true, nil + } + var csiNode *storagev1beta1.CSINode + if adcHost.CSINodeSynced() { + csiNode, err = adcHost.CSINodeLister().Get(string(nodeName)) + if err != nil { + return false, err + } + } else { + // Fallback to GET + klog.Warningf("CSINode informer not synced, falling back to GET directly from API Server") + csiNode, err = kubeClient.StorageV1beta1().CSINodes().Get(string(nodeName), metav1.GetOptions{}) + if err != nil { + return false, err + } + } + + ann := csiNode.GetAnnotations() + if ann == nil { + return false, nil + } + + mpaSet := sets.NewString(strings.Split(ann[v1.MigratedPluginsAnnotationKey], ",")...) + + pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume) + if err != nil { + return false, err + } + + if len(pluginName) == 0 { + // Could not find a plugin name from translation directory + return false, nil + } + + isMigratedOnNode := mpaSet.Has(pluginName) + + if isMigratedOnNode { + installed := false + driverName, err := csilib.GetCSINameFromIntreeName(pluginName) + if err != nil { + return isMigratedOnNode, err + } + for _, driver := range csiNode.Spec.Drivers { + if driver.Name == driverName { + installed = true + break + } + } + if !installed { + return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed.", pluginName, string(nodeName), driverName) + } + } + + return isMigratedOnNode, nil + +} + func translateSpec(spec *volume.Spec) (*volume.Spec, error) { if spec.PersistentVolume != nil { // TranslateInTreePVToCSI will create a new PV diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index c409ae8fd9..6bbe8d23d8 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie()) } + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie()) + } } return role diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 2c64f63a96..bad6b830b4 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -66,6 +66,14 @@ items: - get - list - watch + - apiGroups: + - storage.k8s.io + resources: + - csinodes + verbs: + - get + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: diff --git a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go index 2c72ec2df2..442245ff8a 100644 --- a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go +++ b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go @@ -97,4 +97,7 @@ const ( // This annotation will be used to compute the in-cluster network programming latency SLI, see // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + + // TODO(dyzz) Comment + MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/staging/src/k8s.io/csi-translation-lib/translate.go b/staging/src/k8s.io/csi-translation-lib/translate.go index 7f84a4f7cb..477fb680f2 100644 --- a/staging/src/k8s.io/csi-translation-lib/translate.go +++ b/staging/src/k8s.io/csi-translation-lib/translate.go @@ -95,15 +95,33 @@ func IsMigratedCSIDriverByName(csiPluginName string) bool { return false } +// GetInTreePluginNameFromSpec returns the plugin name +func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error) { + if pv != nil { + for _, curPlugin := range inTreePlugins { + if curPlugin.CanSupport(pv) { + return curPlugin.GetInTreePluginName(), nil + } + } + return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv) + } else if vol != nil { + // TODO(dyzz): Implement inline volume migration support + return "", fmt.Errorf("inline volume migration not yet supported") + } else { + return "", fmt.Errorf("both persistent volume and volume are nil") + } +} + // GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the // in-tree plugin with the given name func GetCSINameFromInTreeName(pluginName string) (string, error) { + for csiDriverName, curPlugin := range inTreePlugins { if curPlugin.GetInTreePluginName() == pluginName { return csiDriverName, nil } } - return "", fmt.Errorf("Could not find CSI Driver name for plugin %v", pluginName) + return "", fmt.Errorf("could not find CSI Driver name for plugin %v", pluginName) } // GetInTreeNameFromCSIName returns the name of the in-tree plugin superseded by diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index c6ac38141c..39722abc4d 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -415,6 +415,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers.Core().V1().Nodes(), informers.Core().V1().PersistentVolumeClaims(), informers.Core().V1().PersistentVolumes(), + informers.Storage().V1beta1().CSINodes(), cloud, plugins, nil, /* prober */