From 4e65976da4f5586bc916d31a93d92dfb47d4140a Mon Sep 17 00:00:00 2001 From: Xing Yang Date: Mon, 4 Mar 2019 13:03:55 -0800 Subject: [PATCH] Move CSIDriver Lister to the controller --- cmd/kube-controller-manager/app/core.go | 1 + .../attachdetach/attach_detach_controller.go | 23 ++++++++ .../attach_detach_controller_test.go | 2 + pkg/kubelet/BUILD | 2 + pkg/kubelet/volume_host.go | 56 +++++++++++++++++++ pkg/kubelet/volumemanager/volume_manager.go | 5 ++ pkg/volume/BUILD | 4 ++ pkg/volume/csi/BUILD | 3 +- pkg/volume/csi/csi_attacher_test.go | 13 ++++- pkg/volume/csi/csi_block_test.go | 3 + pkg/volume/csi/csi_mounter.go | 9 ++- pkg/volume/csi/csi_mounter_test.go | 10 +--- pkg/volume/csi/csi_plugin.go | 37 ++++++++---- pkg/volume/csi/csi_plugin_test.go | 11 +++- .../nodeinfomanager/nodeinfomanager_test.go | 2 + pkg/volume/plugins.go | 27 +++++++++ pkg/volume/testing/BUILD | 1 + pkg/volume/testing/testing.go | 41 ++++++++++---- test/integration/volume/BUILD | 2 + test/integration/volume/attach_detach_test.go | 24 +++++++- 20 files changed, 236 insertions(+), 40 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 7d9d3bfdfd..aa4be25a0f 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -224,6 +224,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), ctx.InformerFactory.Storage().V1beta1().CSINodes(), + ctx.InformerFactory.Storage().V1beta1().CSIDrivers(), ctx.Cloud, ProbeAttachableVolumePlugins(), GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 77ffbc7fb3..fa7086fdc1 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -106,6 +106,7 @@ func NewAttachDetachController( pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, csiNodeInformer storageinformers.CSINodeInformer, + csiDriverInformer storageinformers.CSIDriverInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, prober volume.DynamicPluginProber, @@ -147,6 +148,11 @@ func NewAttachDetachController( adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { + adc.csiDriverLister = csiDriverInformer.Lister() + adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced + } + if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } @@ -271,6 +277,12 @@ type attachDetachController struct { csiNodeLister storagelisters.CSINodeLister csiNodeSynced kcache.InformerSynced + // csiDriverLister is the shared CSIDriver lister used to fetch and store + // CSIDriver objects from the API server. It is shared with other controllers + // and therefore the CSIDriver objects in its store should be treated as immutable. + csiDriverLister storagelisters.CSIDriverLister + csiDriversSynced kcache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -327,6 +339,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { if adc.csiNodeSynced != nil { synced = append(synced, adc.csiNodeSynced) } + if adc.csiDriversSynced != nil { + synced = append(synced, adc.csiDriversSynced) + } if !controller.WaitForCacheSync("attach detach", stopCh, synced...) { return @@ -669,6 +684,10 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister return adc.csiNodeLister } +func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister { + return adc.csiDriverLister +} + func (adc *attachDetachController) IsAttachDetachController() bool { return true } @@ -793,3 +812,7 @@ func (adc *attachDetachController) GetSubpather() subpath.Interface { // Subpaths not needed in attachdetach controller return nil } + +func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister { + return adc.csiDriverLister +} diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 028d868db4..80518e6f57 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -45,6 +45,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), informerFactory.Storage().V1beta1().CSINodes(), + informerFactory.Storage().V1beta1().CSIDrivers(), nil, /* cloud */ nil, /* plugins */ nil, /* prober */ @@ -220,6 +221,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumes(), informerFactory.Storage().V1beta1().CSINodes(), + informerFactory.Storage().V1beta1().CSIDrivers(), nil, /* cloud */ plugins, prober, diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index fb1e04671b..4673a16fed 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -132,9 +132,11 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/certificate:go_default_library", diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 5b2077f348..4b2e51844c 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -26,8 +26,12 @@ import ( authenticationv1 "k8s.io/api/authentication/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/pkg/features" @@ -56,6 +60,24 @@ func NewInitializedVolumePluginMgr( plugins []volume.VolumePlugin, prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) { + // Initialize csiDriverLister before calling InitPlugins + var informerFactory informers.SharedInformerFactory + var csiDriverLister storagelisters.CSIDriverLister + var csiDriversSynced cache.InformerSynced + const resyncPeriod = 0 + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { + // Don't initialize if kubeClient is nil + if kubelet.kubeClient != nil { + informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod) + csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers() + csiDriverLister = csiDriverInformer.Lister() + csiDriversSynced = csiDriverInformer.Informer().HasSynced + + } else { + klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister") + } + } + mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager) if err != nil { return nil, err @@ -67,6 +89,9 @@ func NewInitializedVolumePluginMgr( configMapManager: configMapManager, tokenManager: tokenManager, mountPodManager: mountPodManager, + informerFactory: informerFactory, + csiDriverLister: csiDriverLister, + csiDriversSynced: csiDriversSynced, } if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil { @@ -93,6 +118,9 @@ type kubeletVolumeHost struct { tokenManager *token.Manager configMapManager configmap.Manager mountPodManager mountpod.Manager + informerFactory informers.SharedInformerFactory + csiDriverLister storagelisters.CSIDriverLister + csiDriversSynced cache.InformerSynced } func (kvh *kubeletVolumeHost) SetKubeletError(err error) { @@ -131,6 +159,34 @@ func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface { return kvh.kubelet.subpather } +func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory { + return kvh.informerFactory +} + +func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister { + return kvh.csiDriverLister +} + +func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced { + return kvh.csiDriversSynced +} + +// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister +func (kvh *kubeletVolumeHost) WaitForCacheSync() error { + if kvh.csiDriversSynced == nil { + klog.Error("csiDriversSynced not found on KubeletVolumeHost") + return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost") + } + + synced := []cache.InformerSynced{kvh.csiDriversSynced} + if !cache.WaitForCacheSync(wait.NeverStop, synced...) { + klog.Warning("failed to wait for cache sync for CSIDriverLister") + return fmt.Errorf("failed to wait for cache sync for CSIDriverLister") + } + + return nil +} + func (kvh *kubeletVolumeHost) NewWrapperMounter( volName string, spec volume.Spec, diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 561733d592..652e67a69f 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -250,6 +250,11 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr) + if vm.kubeClient != nil { + // start informer for CSIDriver + vm.volumePluginMgr.Run(stopCh) + } + <-stopCh klog.Infof("Shutting down Kubelet Volume Manager") } diff --git a/pkg/volume/BUILD b/pkg/volume/BUILD index dc4a16cbd6..fd001aee03 100644 --- a/pkg/volume/BUILD +++ b/pkg/volume/BUILD @@ -18,6 +18,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/volume", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume/util/fs:go_default_library", "//pkg/volume/util/recyclerclient:go_default_library", @@ -29,8 +30,11 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 71e07796af..93de7cbc1d 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -31,8 +31,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", @@ -73,6 +71,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 1de09af066..688963ec77 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -1435,12 +1436,20 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu fakeClient = fakeclient.NewSimpleClientset() } fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil)) + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + + // Start informer for CSIDrivers. + factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod) + csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() + csiDriverLister := csiDriverInformer.Lister() + go factory.Start(wait.NeverStop) + host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, fakeClient, nil, "node", + csiDriverLister, ) plugMgr := &volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) @@ -1458,7 +1467,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { - return csiPlug.csiDriverInformer.Informer().HasSynced(), nil + return csiDriverInformer.Informer().HasSynced(), nil }) } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 9bc2b8c63f..f8090310e5 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -216,6 +216,7 @@ func TestBlockMapperSetupDevice(t *testing.T) { fakeClient, nil, "fakeNode", + nil, ) plug.host = host @@ -282,6 +283,7 @@ func TestBlockMapperMapDevice(t *testing.T) { fakeClient, nil, "fakeNode", + nil, ) plug.host = host @@ -364,6 +366,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) { fakeClient, nil, "fakeNode", + nil, ) plug.host = host diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index ccdce2ace2..c64c91baa1 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -19,7 +19,6 @@ package csi import ( "context" "crypto/sha256" - "errors" "fmt" "os" "path" @@ -292,8 +291,14 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { return nil, nil } + + kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost) + if ok { + kletHost.WaitForCacheSync() + } + if c.plugin.csiDriverLister == nil { - return nil, errors.New("CSIDriver lister does not exist") + return nil, fmt.Errorf("CSIDriverLister not found") } csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName)) diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index e7f3edc741..3e1ecd8f12 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -33,7 +33,6 @@ import ( storagev1beta1 "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" fakeclient "k8s.io/client-go/kubernetes/fake" @@ -155,13 +154,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) - if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { - // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { - return plug.csiDriverInformer.Informer().HasSynced(), nil - }) - } - registerFakePlugin(test.driver, "endpoint", []string{"1.0.0"}, t) pv := makeTestPV("test-pv", 10, test.driver, testVol) pv.Spec.CSI.VolumeAttributes = test.volumeContext @@ -391,6 +383,7 @@ func TestMounterSetUpSimple(t *testing.T) { }) } } + func TestMounterSetUpWithInline(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() @@ -527,6 +520,7 @@ func TestMounterSetUpWithInline(t *testing.T) { }) } } + func TestMounterSetUpWithFSGroup(t *testing.T) { fakeClient := fakeclient.NewSimpleClientset() plug, tmpDir := newTestPlugin(t, fakeClient) diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 3590b5726b..c43ec006e0 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -37,10 +37,8 @@ import ( utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" - csiapiinformer "k8s.io/client-go/informers" - csiinformer "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" - csilister "k8s.io/client-go/listers/storage/v1beta1" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" @@ -68,10 +66,9 @@ const ( var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"} type csiPlugin struct { - host volume.VolumeHost - blockEnabled bool - csiDriverLister csilister.CSIDriverLister - csiDriverInformer csiinformer.CSIDriverInformer + host volume.VolumeHost + blockEnabled bool + csiDriverLister storagelisters.CSIDriverLister } //TODO (vladimirvivien) add this type to storage api @@ -217,11 +214,21 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { if csiClient == nil { klog.Warning(log("kubeclient not set, assuming standalone kubelet")) } else { - // Start informer for CSIDrivers. - factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod) - p.csiDriverInformer = factory.Storage().V1beta1().CSIDrivers() - p.csiDriverLister = p.csiDriverInformer.Lister() - go factory.Start(wait.NeverStop) + // set CSIDriverLister + adcHost, ok := host.(volume.AttachDetachVolumeHost) + if ok { + p.csiDriverLister = adcHost.CSIDriverLister() + if p.csiDriverLister == nil { + klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost")) + } + } + kletHost, ok := host.(volume.KubeletVolumeHost) + if ok { + p.csiDriverLister = kletHost.CSIDriverLister() + if p.csiDriverLister == nil { + klog.Error(log("CSIDriverLister not found on KubeletVolumeHost")) + } + } } } @@ -752,6 +759,12 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { return false, nil } + + kletHost, ok := p.host.(volume.KubeletVolumeHost) + if ok { + kletHost.WaitForCacheSync() + } + if p.csiDriverLister == nil { return false, errors.New("CSIDriver lister does not exist") } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 4747ba4db3..4043178e01 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" + "k8s.io/client-go/informers" fakeclient "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/features" @@ -48,11 +49,19 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri if client == nil { client = fakeclient.NewSimpleClientset() } + + // Start informer for CSIDrivers. + factory := informers.NewSharedInformerFactory(client, csiResyncPeriod) + csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() + csiDriverLister := csiDriverInformer.Lister() + go factory.Start(wait.NeverStop) + host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, nil, "fakeNode", + csiDriverLister, ) plugMgr := &volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) @@ -70,7 +79,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { - return csiPlug.csiDriverInformer.Informer().HasSynced(), nil + return csiDriverInformer.Informer().HasSynced(), nil }) } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index 71946f8edb..7429448e3c 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -740,6 +740,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) { client, nil, nodeName, + nil, ) nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) @@ -799,6 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t client, nil, nodeName, + nil, ) nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index c26bc9a4f2..9a77ae070e 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -29,11 +29,15 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" storagelisters "k8s.io/client-go/listers/storage/v1beta1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" "k8s.io/kubernetes/pkg/volume/util/subpath" @@ -319,6 +323,15 @@ type KubeletVolumeHost interface { // SetKubeletError lets plugins set an error on the Kubelet runtime status // that will cause the Kubelet to post NotReady status with the error message provided SetKubeletError(err error) + + // GetInformerFactory returns the informer factory for CSIDriverLister + GetInformerFactory() informers.SharedInformerFactory + // CSIDriverLister returns the informer lister for the CSIDriver API Object + CSIDriverLister() storagelisters.CSIDriverLister + // CSIDriverSynced returns the informer synced for the CSIDriver API Object + CSIDriversSynced() cache.InformerSynced + // WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister + WaitForCacheSync() error } // AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use @@ -327,6 +340,9 @@ type AttachDetachVolumeHost interface { // CSINodeLister returns the informer lister for the CSINode API Object CSINodeLister() storagelisters.CSINodeLister + // CSIDriverLister returns the informer lister for the CSIDriver API Object + CSIDriverLister() storagelisters.CSIDriverLister + // IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost // to the attachDetachController IsAttachDetachController() bool @@ -1004,6 +1020,17 @@ func (pm *VolumePluginMgr) FindNodeExpandablePluginByName(name string) (NodeExpa return nil, nil } +func (pm *VolumePluginMgr) Run(stopCh <-chan struct{}) { + kletHost, ok := pm.Host.(KubeletVolumeHost) + if ok { + // start informer for CSIDriver + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { + informerFactory := kletHost.GetInformerFactory() + go informerFactory.Start(stopCh) + } + } +} + // NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler // pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests // for emptiness. Most attributes of the template will be correct for most diff --git a/pkg/volume/testing/BUILD b/pkg/volume/testing/BUILD index b454ee05f5..0994e3df21 100644 --- a/pkg/volume/testing/BUILD +++ b/pkg/volume/testing/BUILD @@ -26,6 +26,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index b7b5396d2e..2201b9b681 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" clientset "k8s.io/client-go/kubernetes" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/record" utiltesting "k8s.io/client-go/util/testing" cloudprovider "k8s.io/cloud-provider" @@ -62,17 +63,21 @@ const ( // fakeVolumeHost is useful for testing volume plugins. type fakeVolumeHost struct { - rootDir string - kubeClient clientset.Interface - pluginMgr VolumePluginMgr - cloud cloudprovider.Interface - mounter mount.Interface - exec mount.Exec - nodeLabels map[string]string - nodeName string - subpather subpath.Interface + rootDir string + kubeClient clientset.Interface + pluginMgr VolumePluginMgr + cloud cloudprovider.Interface + mounter mount.Interface + exec mount.Exec + nodeLabels map[string]string + nodeName string + subpather subpath.Interface + csiDriverLister storagelisters.CSIDriverLister } +var _ VolumeHost = &fakeVolumeHost{} +var _ AttachDetachVolumeHost = &fakeVolumeHost{} + func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) } @@ -87,9 +92,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf return volHost } -func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost { +func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost { volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) volHost.nodeName = nodeName + if driverLister != nil { + volHost.csiDriverLister = driverLister + } return volHost } @@ -1469,3 +1477,16 @@ func ContainsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.Persisten } return false } + +func (f *fakeVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister { + return f.csiDriverLister +} + +func (f *fakeVolumeHost) CSINodeLister() storagelisters.CSINodeLister { + // not needed for testing + return nil +} + +func (f *fakeVolumeHost) IsAttachDetachController() bool { + return true +} diff --git a/test/integration/volume/BUILD b/test/integration/volume/BUILD index 2778539315..4325654c41 100644 --- a/test/integration/volume/BUILD +++ b/test/integration/volume/BUILD @@ -21,6 +21,7 @@ go_test( "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume/options:go_default_library", + "//pkg/features:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", @@ -31,6 +32,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 39722abc4d..6f1bb9c47c 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -27,7 +27,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientgoinformers "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -36,6 +37,7 @@ import ( volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -179,6 +181,7 @@ func TestPodDeletionWithDswp(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) waitToObservePods(t, podInformer, 1) @@ -207,6 +210,16 @@ func TestPodDeletionWithDswp(t *testing.T) { close(stopCh) } +func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && + utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh) + } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { + go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh) + } +} + func TestPodUpdateWithWithADC(t *testing.T) { _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig()) defer closeFn() @@ -246,6 +259,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) waitToObservePods(t, podInformer, 1) @@ -314,6 +328,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) waitToObservePods(t, podInformer, 1) @@ -383,7 +398,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch } } -func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, informers.SharedInformerFactory) { +func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) { config := restclient.Config{ Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, @@ -408,7 +423,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy } plugins := []volume.VolumePlugin{plugin} cloud := &fakecloud.FakeCloud{} - informers := informers.NewSharedInformerFactory(testClient, resyncPeriod) + informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod) ctrl, err := attachdetach.NewAttachDetachController( testClient, informers.Core().V1().Pods(), @@ -416,6 +431,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers.Core().V1().PersistentVolumeClaims(), informers.Core().V1().PersistentVolumes(), informers.Storage().V1beta1().CSINodes(), + informers.Storage().V1beta1().CSIDrivers(), cloud, plugins, nil, /* prober */ @@ -491,6 +507,7 @@ func TestPodAddedByDswp(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) waitToObservePods(t, podInformer, 1) @@ -576,6 +593,7 @@ func TestPVCBoundWithADC(t *testing.T) { stopCh := make(chan struct{}) informers.Start(stopCh) informers.WaitForCacheSync(stopCh) + initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) go pvCtrl.Run(stopCh)