Merge pull request #78878 from msau42/automated-cherry-pick-of-#75129-upstream-release-1.14

Automated cherry pick of #75129: Move CSIDriver Lister to the controller
k3s-v1.14.4
Kubernetes Prow Robot 2019-06-20 14:31:03 -07:00 committed by GitHub
commit 2ff91b0885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 236 additions and 40 deletions

View File

@ -224,6 +224,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
ctx.InformerFactory.Storage().V1beta1().CSIDrivers(),
ctx.Cloud,
ProbeAttachableVolumePlugins(),
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),

View File

@ -106,6 +106,7 @@ func NewAttachDetachController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
csiDriverInformer storageinformers.CSIDriverInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
@ -147,6 +148,11 @@ func NewAttachDetachController(
adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
adc.csiDriverLister = csiDriverInformer.Lister()
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
}
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}
@ -271,6 +277,12 @@ type attachDetachController struct {
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced
// csiDriverLister is the shared CSIDriver lister used to fetch and store
// CSIDriver objects from the API server. It is shared with other controllers
// and therefore the CSIDriver objects in its store should be treated as immutable.
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced kcache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
@ -327,6 +339,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if adc.csiDriversSynced != nil {
synced = append(synced, adc.csiDriversSynced)
}
if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return
@ -669,6 +684,10 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister
return adc.csiNodeLister
}
func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}
func (adc *attachDetachController) IsAttachDetachController() bool {
return true
}
@ -793,3 +812,7 @@ func (adc *attachDetachController) GetSubpather() subpath.Interface {
// Subpaths not needed in attachdetach controller
return nil
}
func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}

View File

@ -45,6 +45,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
@ -220,6 +221,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,

View File

@ -132,9 +132,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",

View File

@ -26,8 +26,12 @@ import (
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/features"
@ -56,6 +60,24 @@ func NewInitializedVolumePluginMgr(
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
// Initialize csiDriverLister before calling InitPlugins
var informerFactory informers.SharedInformerFactory
var csiDriverLister storagelisters.CSIDriverLister
var csiDriversSynced cache.InformerSynced
const resyncPeriod = 0
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Don't initialize if kubeClient is nil
if kubelet.kubeClient != nil {
informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers()
csiDriverLister = csiDriverInformer.Lister()
csiDriversSynced = csiDriverInformer.Informer().HasSynced
} else {
klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister")
}
}
mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
if err != nil {
return nil, err
@ -67,6 +89,9 @@ func NewInitializedVolumePluginMgr(
configMapManager: configMapManager,
tokenManager: tokenManager,
mountPodManager: mountPodManager,
informerFactory: informerFactory,
csiDriverLister: csiDriverLister,
csiDriversSynced: csiDriversSynced,
}
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
@ -93,6 +118,9 @@ type kubeletVolumeHost struct {
tokenManager *token.Manager
configMapManager configmap.Manager
mountPodManager mountpod.Manager
informerFactory informers.SharedInformerFactory
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced cache.InformerSynced
}
func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
@ -131,6 +159,34 @@ func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
return kvh.kubelet.subpather
}
func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
return kvh.informerFactory
}
func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
return kvh.csiDriverLister
}
func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
return kvh.csiDriversSynced
}
// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
if kvh.csiDriversSynced == nil {
klog.Error("csiDriversSynced not found on KubeletVolumeHost")
return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
}
synced := []cache.InformerSynced{kvh.csiDriversSynced}
if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
klog.Warning("failed to wait for cache sync for CSIDriverLister")
return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
}
return nil
}
func (kvh *kubeletVolumeHost) NewWrapperMounter(
volName string,
spec volume.Spec,

View File

@ -250,6 +250,11 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
if vm.kubeClient != nil {
// start informer for CSIDriver
vm.volumePluginMgr.Run(stopCh)
}
<-stopCh
klog.Infof("Shutting down Kubelet Volume Manager")
}

View File

@ -18,6 +18,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume/util/fs:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
@ -29,8 +30,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -31,8 +31,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
@ -73,6 +71,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
@ -1435,12 +1436,20 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
fakeClient = fakeclient.NewSimpleClientset()
}
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
// Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
csiDriverLister := csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"node",
csiDriverLister,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
@ -1458,7 +1467,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
return csiDriverInformer.Informer().HasSynced(), nil
})
}

View File

@ -216,6 +216,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host
@ -282,6 +283,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host
@ -364,6 +366,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

View File

@ -19,7 +19,6 @@ package csi
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
"path"
@ -292,8 +291,14 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return nil, nil
}
kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
if ok {
kletHost.WaitForCacheSync()
}
if c.plugin.csiDriverLister == nil {
return nil, errors.New("CSIDriver lister does not exist")
return nil, fmt.Errorf("CSIDriverLister not found")
}
csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))

View File

@ -33,7 +33,6 @@ import (
storagev1beta1 "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
@ -155,13 +154,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return plug.csiDriverInformer.Informer().HasSynced(), nil
})
}
registerFakePlugin(test.driver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, test.driver, testVol)
pv.Spec.CSI.VolumeAttributes = test.volumeContext
@ -391,6 +383,7 @@ func TestMounterSetUpSimple(t *testing.T) {
})
}
}
func TestMounterSetUpWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
@ -527,6 +520,7 @@ func TestMounterSetUpWithInline(t *testing.T) {
})
}
}
func TestMounterSetUpWithFSGroup(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)

View File

@ -37,10 +37,8 @@ import (
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csiapiinformer "k8s.io/client-go/informers"
csiinformer "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
csilister "k8s.io/client-go/listers/storage/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
@ -68,10 +66,9 @@ const (
var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"}
type csiPlugin struct {
host volume.VolumeHost
blockEnabled bool
csiDriverLister csilister.CSIDriverLister
csiDriverInformer csiinformer.CSIDriverInformer
host volume.VolumeHost
blockEnabled bool
csiDriverLister storagelisters.CSIDriverLister
}
//TODO (vladimirvivien) add this type to storage api
@ -217,11 +214,21 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
if csiClient == nil {
klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
} else {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Storage().V1beta1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
// set CSIDriverLister
adcHost, ok := host.(volume.AttachDetachVolumeHost)
if ok {
p.csiDriverLister = adcHost.CSIDriverLister()
if p.csiDriverLister == nil {
klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
}
}
kletHost, ok := host.(volume.KubeletVolumeHost)
if ok {
p.csiDriverLister = kletHost.CSIDriverLister()
if p.csiDriverLister == nil {
klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
}
}
}
}
@ -752,6 +759,12 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return false, nil
}
kletHost, ok := p.host.(volume.KubeletVolumeHost)
if ok {
kletHost.WaitForCacheSync()
}
if p.csiDriverLister == nil {
return false, errors.New("CSIDriver lister does not exist")
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
fakeclient "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/features"
@ -48,11 +49,19 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
if client == nil {
client = fakeclient.NewSimpleClientset()
}
// Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(client, csiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
csiDriverLister := csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
nil,
"fakeNode",
csiDriverLister,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
@ -70,7 +79,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
return csiDriverInformer.Informer().HasSynced(), nil
})
}

View File

@ -740,6 +740,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
client,
nil,
nodeName,
nil,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
@ -799,6 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
client,
nil,
nodeName,
nil,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)

View File

@ -29,11 +29,15 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
"k8s.io/kubernetes/pkg/volume/util/subpath"
@ -319,6 +323,15 @@ type KubeletVolumeHost interface {
// SetKubeletError lets plugins set an error on the Kubelet runtime status
// that will cause the Kubelet to post NotReady status with the error message provided
SetKubeletError(err error)
// GetInformerFactory returns the informer factory for CSIDriverLister
GetInformerFactory() informers.SharedInformerFactory
// CSIDriverLister returns the informer lister for the CSIDriver API Object
CSIDriverLister() storagelisters.CSIDriverLister
// CSIDriverSynced returns the informer synced for the CSIDriver API Object
CSIDriversSynced() cache.InformerSynced
// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
WaitForCacheSync() error
}
// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use
@ -327,6 +340,9 @@ type AttachDetachVolumeHost interface {
// CSINodeLister returns the informer lister for the CSINode API Object
CSINodeLister() storagelisters.CSINodeLister
// CSIDriverLister returns the informer lister for the CSIDriver API Object
CSIDriverLister() storagelisters.CSIDriverLister
// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
// to the attachDetachController
IsAttachDetachController() bool
@ -1004,6 +1020,17 @@ func (pm *VolumePluginMgr) FindNodeExpandablePluginByName(name string) (NodeExpa
return nil, nil
}
func (pm *VolumePluginMgr) Run(stopCh <-chan struct{}) {
kletHost, ok := pm.Host.(KubeletVolumeHost)
if ok {
// start informer for CSIDriver
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
informerFactory := kletHost.GetInformerFactory()
go informerFactory.Start(stopCh)
}
}
}
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler
// pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests
// for emptiness. Most attributes of the template will be correct for most

View File

@ -26,6 +26,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/record"
utiltesting "k8s.io/client-go/util/testing"
cloudprovider "k8s.io/cloud-provider"
@ -62,17 +63,21 @@ const (
// fakeVolumeHost is useful for testing volume plugins.
type fakeVolumeHost struct {
rootDir string
kubeClient clientset.Interface
pluginMgr VolumePluginMgr
cloud cloudprovider.Interface
mounter mount.Interface
exec mount.Exec
nodeLabels map[string]string
nodeName string
subpather subpath.Interface
rootDir string
kubeClient clientset.Interface
pluginMgr VolumePluginMgr
cloud cloudprovider.Interface
mounter mount.Interface
exec mount.Exec
nodeLabels map[string]string
nodeName string
subpather subpath.Interface
csiDriverLister storagelisters.CSIDriverLister
}
var _ VolumeHost = &fakeVolumeHost{}
var _ AttachDetachVolumeHost = &fakeVolumeHost{}
func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil)
}
@ -87,9 +92,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf
return volHost
}
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost {
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost {
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil)
volHost.nodeName = nodeName
if driverLister != nil {
volHost.csiDriverLister = driverLister
}
return volHost
}
@ -1469,3 +1477,16 @@ func ContainsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.Persisten
}
return false
}
func (f *fakeVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
return f.csiDriverLister
}
func (f *fakeVolumeHost) CSINodeLister() storagelisters.CSINodeLister {
// not needed for testing
return nil
}
func (f *fakeVolumeHost) IsAttachDetachController() bool {
return true
}

View File

@ -21,6 +21,7 @@ go_test(
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/persistentvolume/options:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
@ -31,6 +32,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -27,7 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientgoinformers "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -36,6 +37,7 @@ import (
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
@ -179,6 +181,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh)
waitToObservePods(t, podInformer, 1)
@ -207,6 +210,16 @@ func TestPodDeletionWithDswp(t *testing.T) {
close(stopCh)
}
func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) {
if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh)
}
}
func TestPodUpdateWithWithADC(t *testing.T) {
_, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
defer closeFn()
@ -246,6 +259,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh)
waitToObservePods(t, podInformer, 1)
@ -314,6 +328,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh)
waitToObservePods(t, podInformer, 1)
@ -383,7 +398,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
}
}
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, informers.SharedInformerFactory) {
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
config := restclient.Config{
Host: server.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
@ -408,7 +423,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
}
plugins := []volume.VolumePlugin{plugin}
cloud := &fakecloud.FakeCloud{}
informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod)
ctrl, err := attachdetach.NewAttachDetachController(
testClient,
informers.Core().V1().Pods(),
@ -416,6 +431,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
informers.Core().V1().PersistentVolumeClaims(),
informers.Core().V1().PersistentVolumes(),
informers.Storage().V1beta1().CSINodes(),
informers.Storage().V1beta1().CSIDrivers(),
cloud,
plugins,
nil, /* prober */
@ -491,6 +507,7 @@ func TestPodAddedByDswp(t *testing.T) {
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh)
waitToObservePods(t, podInformer, 1)
@ -576,6 +593,7 @@ func TestPVCBoundWithADC(t *testing.T) {
stopCh := make(chan struct{})
informers.Start(stopCh)
informers.WaitForCacheSync(stopCh)
initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh)
go pvCtrl.Run(stopCh)