diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 9ee77f5367..7d9d3bfdfd 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -223,6 +223,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.InformerFactory.Storage().V1beta1().CSINodes(), ctx.Cloud, ProbeAttachableVolumePlugins(), GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index bef73c0db0..688287611e 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -101,4 +101,10 @@ const ( // This annotation will be used to compute the in-cluster network programming latency SLI, see // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated + // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. + // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or + // CSI Backend for a volume plugin on a specific node. + MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index d513330fef..cc1c9ce6c6 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/controller/volume/attachdetach/reconciler:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/controller/volume/attachdetach/util:go_default_library", + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", @@ -31,11 +32,14 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index c67dd88f25..77ffbc7fb3 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -30,11 +30,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -47,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" volumeutil "k8s.io/kubernetes/pkg/volume/util" @@ -101,6 +105,7 @@ func NewAttachDetachController( nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, + csiNodeInformer storageinformers.CSINodeInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, prober volume.DynamicPluginProber, @@ -136,6 +141,12 @@ func NewAttachDetachController( pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && + utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + adc.csiNodeLister = csiNodeInformer.Lister() + adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced + } + if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } @@ -257,6 +268,9 @@ type attachDetachController struct { nodeLister corelisters.NodeLister nodesSynced kcache.InformerSynced + csiNodeLister storagelisters.CSINodeLister + csiNodeSynced kcache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -309,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { klog.Infof("Starting attach detach controller") defer klog.Infof("Shutting down attach detach controller") - if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { + synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced} + if adc.csiNodeSynced != nil { + synced = append(synced, adc.csiNodeSynced) + } + + if !controller.WaitForCacheSync("attach detach", stopCh, synced...) { return } @@ -643,6 +662,17 @@ func (adc *attachDetachController) processVolumesInUse( } } +var _ volume.VolumeHost = &attachDetachController{} +var _ volume.AttachDetachVolumeHost = &attachDetachController{} + +func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister { + return adc.csiNodeLister +} + +func (adc *attachDetachController) IsAttachDetachController() bool { + return true +} + // VolumeHost implementation // This is an unfortunate requirement of the current factoring of volume plugin // initializing code. It requires kubelet specific methods used by the mounting diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 2c5c2bc06b..028d868db4 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -44,6 +44,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1beta1().CSINodes(), nil, /* cloud */ nil, /* plugins */ nil, /* prober */ @@ -218,6 +219,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1beta1().CSINodes(), nil, /* cloud */ plugins, prober, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index edee172ef1..4a93449732 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -776,6 +776,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tokenManager := token.NewManager(kubeDeps.KubeClient) + // NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init) + // which affects node ready status. This function must be called before Kubelet is initialized so that the Node + // ReadyState is accurate with the storage state. klet.volumePluginMgr, err = NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber) if err != nil { diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 87e39a0b6d..cb13ec735f 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -544,7 +544,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), - nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), + nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), nodestatus.RemoveOutOfDiskCondition(), // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 71bd5e4a58..becdb0f12d 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -440,6 +440,7 @@ func ReadyCondition( nowFunc func() time.Time, // typically Kubelet.clock.Now runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors + storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent @@ -456,7 +457,7 @@ func ReadyCondition( Message: "kubelet is posting ready status", LastHeartbeatTime: currentTime, } - errs := []error{runtimeErrorsFunc(), networkErrorsFunc()} + errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc()} requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go index c0267a686f..6a3e73cd9d 100644 --- a/pkg/kubelet/nodestatus/setters_test.go +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -895,6 +895,7 @@ func TestReadyCondition(t *testing.T) { node *v1.Node runtimeErrors error networkErrors error + storageErrors error appArmorValidateHostFunc func() error cmStatus cm.Status expectConditions []v1.NodeCondition @@ -929,6 +930,12 @@ func TestReadyCondition(t *testing.T) { }, expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)}, }, + { + desc: "new, not ready: storage errors", + node: withCapacity.DeepCopy(), + storageErrors: errors.New("some storage error"), + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "some storage error", now, now)}, + }, { desc: "new, not ready: runtime and network errors", node: withCapacity.DeepCopy(), @@ -1003,6 +1010,9 @@ func TestReadyCondition(t *testing.T) { networkErrorsFunc := func() error { return tc.networkErrors } + storageErrorsFunc := func() error { + return tc.storageErrors + } cmStatusFunc := func() cm.Status { return tc.cmStatus } @@ -1014,7 +1024,7 @@ func TestReadyCondition(t *testing.T) { }) } // construct setter - setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) + setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) // call setter on node if err := setter(tc.node); err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/kubelet/runtime.go b/pkg/kubelet/runtime.go index 53c85a2887..d51b3b5b29 100644 --- a/pkg/kubelet/runtime.go +++ b/pkg/kubelet/runtime.go @@ -30,6 +30,7 @@ type runtimeState struct { lastBaseRuntimeSync time.Time baseRuntimeSyncThreshold time.Duration networkError error + storageError error cidr string healthChecks []*healthCheck } @@ -61,6 +62,12 @@ func (s *runtimeState) setNetworkState(err error) { s.networkError = err } +func (s *runtimeState) setStorageState(err error) { + s.Lock() + defer s.Unlock() + s.storageError = err +} + func (s *runtimeState) setPodCIDR(cidr string) { s.Lock() defer s.Unlock() @@ -101,6 +108,16 @@ func (s *runtimeState) networkErrors() error { return utilerrors.NewAggregate(errs) } +func (s *runtimeState) storageErrors() error { + s.RLock() + defer s.RUnlock() + errs := []error{} + if s.storageError != nil { + errs = append(errs, s.storageError) + } + return utilerrors.NewAggregate(errs) +} + func newRuntimeState( runtimeSyncThreshold time.Duration, ) *runtimeState { diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index e269c023b5..5b2077f348 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -80,6 +80,7 @@ func NewInitializedVolumePluginMgr( // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface var _ volume.VolumeHost = &kubeletVolumeHost{} +var _ volume.KubeletVolumeHost = &kubeletVolumeHost{} func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string { return kvh.kubelet.getPluginDir(pluginName) @@ -94,6 +95,10 @@ type kubeletVolumeHost struct { mountPodManager mountpod.Manager } +func (kvh *kubeletVolumeHost) SetKubeletError(err error) { + kvh.kubelet.runtimeState.setStorageState(err) +} + func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string { return kvh.kubelet.getVolumeDevicePluginDir(pluginName) } diff --git a/pkg/volume/BUILD b/pkg/volume/BUILD index 7d5c5d4658..dc4a16cbd6 100644 --- a/pkg/volume/BUILD +++ b/pkg/volume/BUILD @@ -30,6 +30,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index ffcf3f7251..a51d3cb8a7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -24,6 +24,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", @@ -32,6 +33,7 @@ go_library( "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 54bdc20ac0..31cbdf7296 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -33,6 +33,7 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -40,6 +41,7 @@ import ( csiinformer "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" csilister "k8s.io/client-go/listers/storage/v1beta1" + csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager" @@ -216,15 +218,84 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { go factory.Start(wait.NeverStop) } + var migratedPlugins = map[string](func() bool){ + csitranslationplugins.GCEPDInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) + }, + csitranslationplugins.AWSEBSInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) + }, + csitranslationplugins.CinderInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) + }, + } + // Initializing the label management channels - nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host) + nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins) - // TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver - // objects for migrated drivers. + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && + utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { + // This function prevents Kubelet from posting Ready status until CSINodeInfo + // is both installed and initialized + if err := initializeCSINode(host); err != nil { + return fmt.Errorf("failed to initialize CSINodeInfo: %v", err) + } + } return nil } +func initializeCSINode(host volume.VolumeHost) error { + kvh, ok := host.(volume.KubeletVolumeHost) + if !ok { + klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet") + return nil + } + kubeClient := host.GetKubeClient() + if kubeClient == nil { + // Kubelet running in standalone mode. Skip CSINodeInfo initialization + klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode") + return nil + } + + kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized")) + + go func() { + defer utilruntime.HandleCrash() + + // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet + // after max retry steps. + initBackoff := wait.Backoff{ + Steps: 6, + Duration: 15 * time.Millisecond, + Factor: 6.0, + Jitter: 0.1, + } + err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { + klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") + err := nim.InitializeCSINodeWithAnnotation() + if err != nil { + kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) + klog.Errorf("Failed to initialize CSINodeInfo: %v", err) + return false, nil + } + + // Successfully initialized drivers, allow Kubelet to post Ready + kvh.SetKubeletError(nil) + return true, nil + }) + if err != nil { + // 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin) + // are permanently enabled the apiserver/controllers can assume that the kubelet is + // using CSI for all Migrated volume plugins. Then all the CSINode initialization + // code can be dropped from Kubelet. + // Kill the Kubelet process and allow it to restart to retry initialization + klog.Fatalf("Failed to initialize CSINodeInfo after retrying") + } + }() + return nil +} + func (p *csiPlugin) GetPluginName() string { return CSIPluginName } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index dfea45f51e..ba25b07e65 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -20,12 +20,14 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana import ( "encoding/json" + goerrors "errors" "fmt" + "strings" "time" "k8s.io/api/core/v1" - storage "k8s.io/api/storage/v1beta1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,8 +62,9 @@ var ( // nodeInfoManager contains necessary common dependencies to update node info on both // the Node and CSINode objects. type nodeInfoManager struct { - nodeName types.NodeName - volumeHost volume.VolumeHost + nodeName types.NodeName + volumeHost volume.VolumeHost + migratedPlugins map[string](func() bool) } // If no updates is needed, the function must return the same Node object as the input. @@ -69,7 +72,10 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error) // Interface implements an interface for managing labels of a node type Interface interface { - CreateCSINode() (*storage.CSINode, error) + CreateCSINode() (*storagev1beta1.CSINode, error) + + // Updates or Creates the CSINode object with annotations for CSI Migration + InitializeCSINodeWithAnnotation() error // Record in the cluster the given node information from the CSI driver with the given name. // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls @@ -85,10 +91,12 @@ type Interface interface { // NewNodeInfoManager initializes nodeInfoManager func NewNodeInfoManager( nodeName types.NodeName, - volumeHost volume.VolumeHost) Interface { + volumeHost volume.VolumeHost, + migratedPlugins map[string](func() bool)) Interface { return &nodeInfoManager{ - nodeName: nodeName, - volumeHost: volumeHost, + nodeName: nodeName, + volumeHost: volumeHost, + migratedPlugins: migratedPlugins, } } @@ -384,7 +392,46 @@ func (nim *nodeInfoManager) tryUpdateCSINode( return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology) } -func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { +func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error { + csiKubeClient := nim.volumeHost.GetKubeClient() + if csiKubeClient == nil { + return goerrors.New("error getting CSI client") + } + + var updateErrs []error + err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { + if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil { + updateErrs = append(updateErrs, err) + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) + } + + return nil +} + +func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error { + nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{}) + if nodeInfo == nil || errors.IsNotFound(err) { + // CreateCSINode will set the annotation + _, err = nim.CreateCSINode() + return err + } + + annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + + if annotationModified { + _, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo) + return err + } + return nil + +} + +func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) { kubeClient := nim.volumeHost.GetKubeClient() if kubeClient == nil { @@ -401,7 +448,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { return nil, err } - nodeInfo := &storage.CSINode{ + nodeInfo := &storagev1beta1.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: string(nim.nodeName), OwnerReferences: []metav1.OwnerReference{ @@ -413,16 +460,59 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { }, }, }, - Spec: storage.CSINodeSpec{ - Drivers: []storage.CSINodeDriver{}, + Spec: storagev1beta1.CSINodeSpec{ + Drivers: []storagev1beta1.CSINodeDriver{}, }, } + setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo) } +func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1beta1.CSINode) (modified bool) { + if migratedPlugins == nil { + return false + } + + nodeInfoAnnotations := nodeInfo.GetAnnotations() + if nodeInfoAnnotations == nil { + nodeInfoAnnotations = map[string]string{} + } + + var oldAnnotationSet sets.String + mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] + tok := strings.Split(mpa, ",") + if len(mpa) == 0 { + oldAnnotationSet = sets.NewString() + } else { + oldAnnotationSet = sets.NewString(tok...) + } + + newAnnotationSet := sets.NewString() + for pluginName, migratedFunc := range migratedPlugins { + if migratedFunc() { + newAnnotationSet.Insert(pluginName) + } + } + + if oldAnnotationSet.Equal(newAnnotationSet) { + return false + } + + nas := strings.Join(newAnnotationSet.List(), ",") + if len(nas) != 0 { + nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas + } else { + delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey) + } + + nodeInfo.Annotations = nodeInfoAnnotations + return true +} + func (nim *nodeInfoManager) installDriverToCSINode( - nodeInfo *storage.CSINode, + nodeInfo *storagev1beta1.CSINode, driverName string, driverNodeID string, topology map[string]string) error { @@ -438,9 +528,8 @@ func (nim *nodeInfoManager) installDriverToCSINode( } specModified := true - statusModified := true // Clone driver list, omitting the driver that matches the given driverName - newDriverSpecs := []storage.CSINodeDriver{} + newDriverSpecs := []storagev1beta1.CSINodeDriver{} for _, driverInfoSpec := range nodeInfo.Spec.Drivers { if driverInfoSpec.Name == driverName { if driverInfoSpec.NodeID == driverNodeID && @@ -453,12 +542,14 @@ func (nim *nodeInfoManager) installDriverToCSINode( } } - if !specModified && !statusModified { + annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + + if !specModified && !annotationModified { return nil } // Append new driver - driverSpec := storage.CSINodeDriver{ + driverSpec := storagev1beta1.CSINodeDriver{ Name: driverName, NodeID: driverNodeID, TopologyKeys: topologyKeys.List(), @@ -517,6 +608,7 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode( hasModified = true } } + if !hasModified { // No changes, don't update return nil diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index 6b5831e9f7..71946f8edb 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -19,6 +19,7 @@ package nodeinfomanager import ( "encoding/json" "fmt" + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -545,6 +546,156 @@ func TestUninstallCSIDriverCSINodeInfoDisabled(t *testing.T) { test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases) } +func TestSetMigrationAnnotation(t *testing.T) { + testcases := []struct { + name string + migratedPlugins map[string](func() bool) + existingNode *storage.CSINode + expectedNode *storage.CSINode + expectModified bool + }{ + { + name: "nil migrated plugins", + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + }, + { + name: "one modified plugin", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, + }, + }, + expectModified: true, + }, + { + name: "existing plugin", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, + }, + }, + expectModified: false, + }, + { + name: "remove plugin", + migratedPlugins: map[string](func() bool){}, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{}, + }, + }, + expectModified: true, + }, + { + name: "one modified plugin, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + { + name: "multiple plugins modified, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + "foo": func() bool { return false }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation", v1.MigratedPluginsAnnotationKey: "foo"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + { + name: "multiple plugins added, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + "foo": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + } + + for _, tc := range testcases { + t.Logf("test case: %s", tc.name) + + modified := setMigrationAnnotation(tc.migratedPlugins, tc.existingNode) + if modified != tc.expectModified { + t.Errorf("Expected modified to be %v but got %v instead", tc.expectModified, modified) + } + + if !reflect.DeepEqual(tc.expectedNode, tc.existingNode) { + t.Errorf("Expected CSINode: %v, but got: %v", tc.expectedNode, tc.existingNode) + } + } +} + func TestInstallCSIDriverExistingAnnotation(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)() @@ -591,7 +742,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) { nodeName, ) - nim := NewNodeInfoManager(types.NodeName(nodeName), host) + nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) // Act _, err = nim.CreateCSINode() @@ -649,7 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t nil, nodeName, ) - nim := NewNodeInfoManager(types.NodeName(nodeName), host) + nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) //// Act nim.CreateCSINode() diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 4228bc694e..76780a5f6f 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -30,6 +30,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation" clientset "k8s.io/client-go/kubernetes" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" @@ -278,6 +279,32 @@ type BlockVolumePlugin interface { ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error) } +// TODO(#14217) +// As part of the Volume Host refactor we are starting to create Volume Hosts +// for specific hosts. New methods for each specific host can be added here. +// Currently consumers will do type assertions to get the specific type of Volume +// Host; however, the end result should be that specific Volume Hosts are passed +// to the specific functions they are needed in (instead of using a catch-all +// VolumeHost interface) + +// KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet. +type KubeletVolumeHost interface { + // SetKubeletError lets plugins set an error on the Kubelet runtime status + // that will cause the Kubelet to post NotReady status with the error message provided + SetKubeletError(err error) +} + +// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use +// to access methods on the Attach Detach Controller. +type AttachDetachVolumeHost interface { + // CSINodeLister returns the informer lister for the CSINode API Object + CSINodeLister() storagelisters.CSINodeLister + + // IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost + // to the attachDetachController + IsAttachDetachController() bool +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index d87b21c447..314dcaa8c3 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -28,6 +28,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index c35a060dae..d2ed9edc69 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + goerrors "errors" "fmt" "path" "strings" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -303,8 +305,14 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } originalSpec := volumeToAttach.VolumeSpec + nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName) + if err != nil { + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NodeUsingCSIPlugin failed", err) + } + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate - if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) { + if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu { // The volume represented by this spec is CSI and thus should be migrated attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) if err != nil || attachableVolumePlugin == nil { @@ -401,17 +409,22 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( if volumeToDetach.VolumeSpec != nil { // Get attacher plugin + nu, err := nodeUsingCSIPlugin(og, volumeToDetach.VolumeSpec, volumeToDetach.NodeName) + if err != nil { + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NodeUsingCSIPlugin failed", err) + } + // useCSIPlugin will check both CSIMigration and the plugin specific feature gate - if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) { + if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) && nu { // The volume represented by this spec is CSI and thus should be migrated attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) if err != nil || attachableVolumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } csiSpec, err := translateSpec(volumeToDetach.VolumeSpec) if err != nil { - return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err) } volumeToDetach.VolumeSpec = csiSpec @@ -861,7 +874,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( if deviceOpened { return deviceToDetach.GenerateError( "UnmountDevice failed", - fmt.Errorf("the device is in use when it was no longer expected to be in use")) + goerrors.New("the device is in use when it was no longer expected to be in use")) } klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) @@ -968,7 +981,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( devicePath = pluginDevicePath } if len(devicePath) == 0 { - return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) + return volumeToMount.GenerateError("MapVolume failed", goerrors.New("Device path of the volume is empty")) } // When kubelet is containerized, devicePath may be a symlink at a place unavailable to @@ -1533,8 +1546,11 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo return deviceOpened, nil } -// TODO(dyzz): need to also add logic to check CSINodeInfo for Kubelet migration status func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { + // TODO(#75146) Check whether the driver is installed as well so that + // we can throw a better error when the driver is not installed. + // The error should be of the approximate form: + // fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName) if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { return false } @@ -1547,6 +1563,93 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool { return false } +func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) { + migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec) + if err != nil { + return false, err + } + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) || + !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) || + !migratable { + return false, nil + } + + if len(nodeName) == 0 { + return false, goerrors.New("nodeName is empty") + } + + kubeClient := og.volumePluginMgr.Host.GetKubeClient() + if kubeClient == nil { + // Don't handle the controller/kubelet version skew check and fallback + // to just checking the feature gates. This can happen if + // we are in a standalone (headless) Kubelet + return true, nil + } + + adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost) + if !ok { + // Don't handle the controller/kubelet version skew check and fallback + // to just checking the feature gates. This can happen if + // "enableControllerAttachDetach" is set to true on kubelet + return true, nil + } + + if adcHost.CSINodeLister() == nil { + return false, goerrors.New("could not find CSINodeLister in attachDetachController") + } + + csiNode, err := adcHost.CSINodeLister().Get(string(nodeName)) + if err != nil { + return false, err + } + + ann := csiNode.GetAnnotations() + if ann == nil { + return false, nil + } + + var mpaSet sets.String + mpa := ann[v1.MigratedPluginsAnnotationKey] + tok := strings.Split(mpa, ",") + if len(mpa) == 0 { + mpaSet = sets.NewString() + } else { + mpaSet = sets.NewString(tok...) + } + + pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume) + if err != nil { + return false, err + } + + if len(pluginName) == 0 { + // Could not find a plugin name from translation directory, assume not translated + return false, nil + } + + isMigratedOnNode := mpaSet.Has(pluginName) + + if isMigratedOnNode { + installed := false + driverName, err := csilib.GetCSINameFromInTreeName(pluginName) + if err != nil { + return isMigratedOnNode, err + } + for _, driver := range csiNode.Spec.Drivers { + if driver.Name == driverName { + installed = true + break + } + } + if !installed { + return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName) + } + } + + return isMigratedOnNode, nil + +} + func translateSpec(spec *volume.Spec) (*volume.Spec, error) { if spec.PersistentVolume != nil { // TranslateInTreePVToCSI will create a new PV @@ -1559,8 +1662,8 @@ func translateSpec(spec *volume.Spec) (*volume.Spec, error) { ReadOnly: spec.ReadOnly, }, nil } else if spec.Volume != nil { - return &volume.Spec{}, fmt.Errorf("translation is not supported for in-line volumes yet") + return &volume.Spec{}, goerrors.New("translation is not supported for in-line volumes yet") } else { - return &volume.Spec{}, fmt.Errorf("not a valid volume spec") + return &volume.Spec{}, goerrors.New("not a valid volume spec") } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index c409ae8fd9..d1e83dd99c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie()) } + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { + role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie()) + } } return role diff --git a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go index 2c72ec2df2..edc9b4d600 100644 --- a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go +++ b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go @@ -97,4 +97,10 @@ const ( // This annotation will be used to compute the in-cluster network programming latency SLI, see // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated + // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. + // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or + // CSI Backend for a volume plugin on a specific node. + MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" ) diff --git a/staging/src/k8s.io/csi-translation-lib/translate.go b/staging/src/k8s.io/csi-translation-lib/translate.go index 7f84a4f7cb..ac1a4dd3d8 100644 --- a/staging/src/k8s.io/csi-translation-lib/translate.go +++ b/staging/src/k8s.io/csi-translation-lib/translate.go @@ -17,6 +17,7 @@ limitations under the License. package csitranslation import ( + "errors" "fmt" "k8s.io/api/core/v1" @@ -48,7 +49,7 @@ func TranslateInTreeStorageClassParametersToCSI(inTreePluginName string, scParam // be modified func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { if pv == nil { - return nil, fmt.Errorf("persistent volume was nil") + return nil, errors.New("persistent volume was nil") } copiedPV := pv.DeepCopy() for _, curPlugin := range inTreePlugins { @@ -64,7 +65,7 @@ func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, erro // by the `Driver` field in the CSI Source. The input PV object will not be modified. func TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { if pv == nil || pv.Spec.CSI == nil { - return nil, fmt.Errorf("CSI persistent volume was nil") + return nil, errors.New("CSI persistent volume was nil") } copiedPV := pv.DeepCopy() for driverName, curPlugin := range inTreePlugins { @@ -95,6 +96,23 @@ func IsMigratedCSIDriverByName(csiPluginName string) bool { return false } +// GetInTreePluginNameFromSpec returns the plugin name +func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error) { + if pv != nil { + for _, curPlugin := range inTreePlugins { + if curPlugin.CanSupport(pv) { + return curPlugin.GetInTreePluginName(), nil + } + } + return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv) + } else if vol != nil { + // TODO(dyzz): Implement inline volume migration support + return "", errors.New("inline volume migration not yet supported") + } else { + return "", errors.New("both persistent volume and volume are nil") + } +} + // GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the // in-tree plugin with the given name func GetCSINameFromInTreeName(pluginName string) (string, error) { @@ -103,7 +121,7 @@ func GetCSINameFromInTreeName(pluginName string) (string, error) { return csiDriverName, nil } } - return "", fmt.Errorf("Could not find CSI Driver name for plugin %v", pluginName) + return "", fmt.Errorf("could not find CSI Driver name for plugin %v", pluginName) } // GetInTreeNameFromCSIName returns the name of the in-tree plugin superseded by diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index c6ac38141c..39722abc4d 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -415,6 +415,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers.Core().V1().Nodes(), informers.Core().V1().PersistentVolumeClaims(), informers.Core().V1().PersistentVolumes(), + informers.Storage().V1beta1().CSINodes(), cloud, plugins, nil, /* prober */