From 396c3c7c6fd008663d2d30369c8e33a58cde5ee2 Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Tue, 25 Jul 2017 17:48:26 -0700 Subject: [PATCH] Adding dynamic Flexvolume plugin discovery capability, using filesystem watch. --- cmd/kube-controller-manager/app/core.go | 3 +- cmd/kube-controller-manager/app/plugins.go | 14 +- cmd/kubelet/app/plugins.go | 12 +- cmd/kubelet/app/server.go | 32 +-- pkg/controller/volume/attachdetach/BUILD | 1 + .../attachdetach/attach_detach_controller.go | 3 +- .../attach_detach_controller_test.go | 7 +- .../volume/persistentvolume/framework_test.go | 2 +- .../persistentvolume/pv_controller_base.go | 3 +- pkg/kubelet/kubelet.go | 3 +- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/runonce_test.go | 2 +- pkg/kubelet/volume_host.go | 5 +- .../volumemanager/volume_manager_test.go | 3 +- pkg/volume/aws_ebs/aws_ebs_test.go | 10 +- pkg/volume/azure_dd/azure_dd_test.go | 2 +- pkg/volume/azure_file/azure_file_test.go | 10 +- pkg/volume/cephfs/cephfs_test.go | 6 +- pkg/volume/cinder/cinder_test.go | 4 +- pkg/volume/configmap/configmap_test.go | 10 +- pkg/volume/downwardapi/downwardapi_test.go | 4 +- pkg/volume/empty_dir/empty_dir_test.go | 2 +- pkg/volume/fc/fc_test.go | 10 +- pkg/volume/flexvolume/BUILD | 2 + pkg/volume/flexvolume/flexvolume_test.go | 4 +- pkg/volume/flexvolume/probe.go | 207 +++++++++++++++++- pkg/volume/flocker/flocker_test.go | 4 +- pkg/volume/flocker/flocker_volume_test.go | 2 +- pkg/volume/gce_pd/attacher_test.go | 3 +- pkg/volume/gce_pd/gce_pd_test.go | 8 +- pkg/volume/git_repo/git_repo_test.go | 4 +- pkg/volume/glusterfs/glusterfs_test.go | 8 +- pkg/volume/host_path/host_path_test.go | 17 +- pkg/volume/iscsi/iscsi_test.go | 8 +- pkg/volume/local/local_test.go | 6 +- pkg/volume/nfs/nfs_test.go | 10 +- pkg/volume/photon_pd/photon_pd_test.go | 8 +- pkg/volume/plugins.go | 114 +++++++++- pkg/volume/plugins_test.go | 3 +- pkg/volume/portworx/portworx_test.go | 6 +- pkg/volume/projected/projected_test.go | 10 +- pkg/volume/quobyte/quobyte_test.go | 8 +- pkg/volume/rbd/rbd_test.go | 6 +- pkg/volume/scaleio/sio_volume_test.go | 2 +- pkg/volume/secret/secret_test.go | 10 +- pkg/volume/storageos/storageos_test.go | 8 +- pkg/volume/storageos/storageos_util_test.go | 4 +- pkg/volume/testing/testing.go | 4 +- .../vsphere_volume/vsphere_volume_test.go | 4 +- test/integration/volume/attach_detach_test.go | 1 + 50 files changed, 464 insertions(+), 158 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 0d6f3dca1f..46c9309f51 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -175,7 +175,8 @@ func startAttachDetachController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), ctx.Cloud, - ProbeAttachableVolumePlugins(ctx.Options.VolumeConfiguration), + ProbeAttachableVolumePlugins(), + GetDynamicPluginProber(ctx.Options.VolumeConfiguration), ctx.Options.DisableAttachDetachReconcilerSync, ctx.Options.ReconcilerSyncLoopPeriod.Duration, attachdetach.DefaultTimerConfig, diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 2b036c8b7e..bdc62a2673 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -60,18 +60,15 @@ import ( ) // ProbeAttachableVolumePlugins collects all volume plugins for the attach/ -// detach controller. VolumeConfiguration is used ot get FlexVolumePluginDir -// which specifies the directory to search for additional third party volume -// plugins. +// detach controller. // The list of plugins is manually compiled. This code and the plugin // initialization code for kubelet really, really need a through refactor. -func ProbeAttachableVolumePlugins(config componentconfig.VolumeConfiguration) []volume.VolumePlugin { +func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins := []volume.VolumePlugin{} allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) - allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(config.FlexVolumePluginDir)...) allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...) @@ -82,6 +79,13 @@ func ProbeAttachableVolumePlugins(config componentconfig.VolumeConfiguration) [] return allPlugins } +// GetDynamicPluginProber gets the probers of dynamically discoverable plugins +// for the attach/detach controller. +// Currently only Flexvolume plugins are dynamically discoverable. +func GetDynamicPluginProber(config componentconfig.VolumeConfiguration) volume.DynamicPluginProber { + return flexvolume.GetDynamicPluginProber(config.FlexVolumePluginDir) +} + // ProbeControllerVolumePlugins collects all persistent volume plugins into an // easy to use list. Only volume plugins that implement any of // provisioner/recycler/deleter interface should be returned. diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index 852bd12bf0..13abcd9ff7 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -61,9 +61,7 @@ import ( ) // ProbeVolumePlugins collects all volume plugins into an easy to use list. -// PluginDir specifies the directory to search for additional third party -// volume plugins. -func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { +func ProbeVolumePlugins() []volume.VolumePlugin { allPlugins := []volume.VolumePlugin{} // The list of plugins to probe is decided by the kubelet binary, not @@ -88,7 +86,6 @@ func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) - allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...) allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...) allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...) allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) @@ -102,6 +99,13 @@ func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { return allPlugins } +// GetDynamicPluginProber gets the probers of dynamically discoverable plugins +// for kubelet. +// Currently only Flexvolume plugins are dynamically discoverable. +func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber { + return flexvolume.GetDynamicPluginProber(pluginDir) +} + // ProbeNetworkPlugins collects all compiled-in plugins func ProbeNetworkPlugins(pluginDir, cniConfDir, cniBinDir string) []network.NetworkPlugin { allPlugins := []network.NetworkPlugin{} diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 85d37b75c6..ddf0a49797 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -151,22 +151,22 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err } return &kubelet.Dependencies{ - Auth: nil, // default does not enforce auth[nz] - CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here - Cloud: nil, // cloud provider might start background processes - ContainerManager: nil, - DockerClient: dockerClient, - KubeClient: nil, - ExternalKubeClient: nil, - EventClient: nil, - Mounter: mounter, - NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir), - OOMAdjuster: oom.NewOOMAdjuster(), - OSInterface: kubecontainer.RealOS{}, - Writer: writer, - VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), - TLSOptions: tlsOptions, - }, nil + Auth: nil, // default does not enforce auth[nz] + CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here + Cloud: nil, // cloud provider might start background processes + ContainerManager: nil, + DockerClient: dockerClient, + KubeClient: nil, + ExternalKubeClient: nil, + EventClient: nil, + Mounter: mounter, + NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir), + OOMAdjuster: oom.NewOOMAdjuster(), + OSInterface: kubecontainer.RealOS{}, + Writer: writer, + VolumePlugins: ProbeVolumePlugins(), + DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir), + TLSOptions: tlsOptions}, nil } // Run runs the specified KubeletServer with the given Dependencies. This should never exit. diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index db1e5fcccb..b3d129b7ab 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -45,6 +45,7 @@ go_test( "//pkg/controller:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//pkg/volume:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 151c900fd6..a608fbf94d 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -97,6 +97,7 @@ func NewAttachDetachController( pvInformer coreinformers.PersistentVolumeInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, + prober volume.DynamicPluginProber, disableReconciliationSync bool, reconcilerSyncDuration time.Duration, timerConfig TimerConfig) (AttachDetachController, error) { @@ -127,7 +128,7 @@ func NewAttachDetachController( cloud: cloud, } - if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil { + if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index b63103bf2f..b0e12589b1 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/volume" ) func Test_NewAttachDetachController_Positive(t *testing.T) { @@ -45,6 +46,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { informerFactory.Core().V1().PersistentVolumes(), nil, /* cloud */ nil, /* plugins */ + nil, /* prober */ false, 5*time.Second, DefaultTimerConfig) @@ -79,8 +81,9 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) { // Act plugins := controllervolumetesting.CreateTestPlugin() + var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock - if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil { + if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } @@ -141,6 +144,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1) //informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1) plugins := controllervolumetesting.CreateTestPlugin() + var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock nodeInformer := informerFactory.Core().V1().Nodes().Informer() podInformer := informerFactory.Core().V1().Pods().Informer() var podsNum, extraPodsNum, nodesNum, i int @@ -212,6 +216,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Core().V1().PersistentVolumes(), nil, /* cloud */ plugins, + prober, false, 1*time.Second, DefaultTimerConfig) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 64c75adb53..9c53ef3891 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -827,7 +827,7 @@ func wrapTestWithPluginCalls(expectedRecycleCalls, expectedDeleteCalls []error, deleteCalls: expectedDeleteCalls, provisionCalls: expectedProvisionCalls, } - ctrl.volumePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, ctrl) + ctrl.volumePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, nil /* prober */, ctrl) return toWrap(ctrl, reactor, test) } } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index cfbe26401a..a4c6d918d1 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -90,7 +90,8 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error) resyncPeriod: p.SyncPeriod, } - if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller); err != nil { + // Prober is nil because PV is not aware of Flexvolume. + if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 80fd03f26a..952af0ac1b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -243,6 +243,7 @@ type Dependencies struct { Recorder record.EventRecorder Writer kubeio.Writer VolumePlugins []volume.VolumePlugin + DynamicPluginProber volume.DynamicPluginProber TLSOptions *server.TLSOptions KubeletConfigController *kubeletconfig.Controller } @@ -736,7 +737,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.Recorder) klet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins) + NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber) if err != nil { return nil, err } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b5a1ba0924..57f0ef96e6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -272,8 +272,9 @@ func newTestKubeletWithImageList( kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub())) plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} + var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock kubelet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug}) + NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug}, prober) require.NoError(t, err, "Failed to initialize VolumePluginMgr") kubelet.mounter = &mount.FakeMounter{} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 7449c271cc..a43b6fb27d 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -92,7 +92,7 @@ func TestRunOnce(t *testing.T) { plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} kb.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, []volume.VolumePlugin{plug}) + NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, []volume.VolumePlugin{plug}, nil /* prober */) if err != nil { t.Fatalf("failed to initialize VolumePluginMgr: %v", err) } diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 3cedc81da8..26323b7d84 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -41,7 +41,8 @@ func NewInitializedVolumePluginMgr( kubelet *Kubelet, secretManager secret.Manager, configMapManager configmap.Manager, - plugins []volume.VolumePlugin) (*volume.VolumePluginMgr, error) { + plugins []volume.VolumePlugin, + prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) { kvh := &kubeletVolumeHost{ kubelet: kubelet, volumePluginMgr: volume.VolumePluginMgr{}, @@ -49,7 +50,7 @@ func NewInitializedVolumePluginMgr( configMapManager: configMapManager, } - if err := kvh.volumePluginMgr.InitPlugins(plugins, kvh); err != nil { + if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil { return nil, fmt.Errorf( "Could not initialize volume plugins for KubeletVolumePluginMgr: %v", err) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 2648e0f2b0..6eb81cb7f7 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -217,7 +217,8 @@ func newTestVolumeManager(tmpDir string, podManager pod.Manager, kubeClient clie plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} fakeRecorder := &record.FakeRecorder{} plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil)) + // TODO (#51147) inject mock prober + plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil)) statusManager := status.NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}) vm := NewVolumeManager( diff --git a/pkg/volume/aws_ebs/aws_ebs_test.go b/pkg/volume/aws_ebs/aws_ebs_test.go index 3766b21254..85538b3649 100644 --- a/pkg/volume/aws_ebs/aws_ebs_test.go +++ b/pkg/volume/aws_ebs/aws_ebs_test.go @@ -40,7 +40,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs") if err != nil { @@ -64,7 +64,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/aws-ebs") if err != nil { @@ -113,7 +113,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs") if err != nil { @@ -251,7 +251,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, clientset, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, clientset, nil)) plug, _ := plugMgr.FindPluginByName(awsElasticBlockStorePluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes @@ -271,7 +271,7 @@ func TestMounterAndUnmounterTypeAssert(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs") if err != nil { diff --git a/pkg/volume/azure_dd/azure_dd_test.go b/pkg/volume/azure_dd/azure_dd_test.go index 59becdeead..e1d24c9141 100644 --- a/pkg/volume/azure_dd/azure_dd_test.go +++ b/pkg/volume/azure_dd/azure_dd_test.go @@ -33,7 +33,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName(azureDataDiskPluginName) if err != nil { diff --git a/pkg/volume/azure_file/azure_file_test.go b/pkg/volume/azure_file/azure_file_test.go index 8991d5e231..d6fc125963 100644 --- a/pkg/volume/azure_file/azure_file_test.go +++ b/pkg/volume/azure_file/azure_file_test.go @@ -41,7 +41,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/azure-file") if err != nil { @@ -65,7 +65,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/azure-file") if err != nil { @@ -131,7 +131,7 @@ func TestPluginWithOtherCloudProvider(t *testing.T) { func testPlugin(t *testing.T, tmpDir string, volumeHost volume.VolumeHost) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumeHost) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumeHost) plug, err := plugMgr.FindPluginByName("kubernetes.io/azure-file") if err != nil { @@ -228,7 +228,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost("/tmp/fake", client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost("/tmp/fake", client, nil)) plug, _ := plugMgr.FindPluginByName(azureFilePluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes @@ -260,7 +260,7 @@ func TestMounterAndUnmounterTypeAssert(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/azure-file") if err != nil { diff --git a/pkg/volume/cephfs/cephfs_test.go b/pkg/volume/cephfs/cephfs_test.go index f17e1902ea..da9d85f11e 100644 --- a/pkg/volume/cephfs/cephfs_test.go +++ b/pkg/volume/cephfs/cephfs_test.go @@ -36,7 +36,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/cephfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -59,7 +59,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/cephfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -123,7 +123,7 @@ func TestConstructVolumeSpec(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/cephfs") if err != nil { t.Errorf("can't find cephfs plugin by name") diff --git a/pkg/volume/cinder/cinder_test.go b/pkg/volume/cinder/cinder_test.go index d4cf53cdc7..a5fc228814 100644 --- a/pkg/volume/cinder/cinder_test.go +++ b/pkg/volume/cinder/cinder_test.go @@ -38,7 +38,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder") if err != nil { @@ -134,7 +134,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder") if err != nil { diff --git a/pkg/volume/configmap/configmap_test.go b/pkg/volume/configmap/configmap_test.go index 5ff5acf801..9b23885175 100644 --- a/pkg/volume/configmap/configmap_test.go +++ b/pkg/volume/configmap/configmap_test.go @@ -272,7 +272,7 @@ func TestCanSupport(t *testing.T) { pluginMgr := volume.VolumePluginMgr{} tempDir, host := newTestHost(t, nil) defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(configMapPluginName) if err != nil { @@ -304,7 +304,7 @@ func TestPlugin(t *testing.T) { ) defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(configMapPluginName) if err != nil { @@ -368,7 +368,7 @@ func TestPluginReboot(t *testing.T) { ) defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(configMapPluginName) if err != nil { @@ -424,7 +424,7 @@ func TestPluginOptional(t *testing.T) { volumeSpec.VolumeSource.ConfigMap.Optional = &trueVal defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(configMapPluginName) if err != nil { @@ -499,7 +499,7 @@ func TestPluginKeysOptional(t *testing.T) { volumeSpec.VolumeSource.ConfigMap.Optional = &trueVal defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(configMapPluginName) if err != nil { diff --git a/pkg/volume/downwardapi/downwardapi_test.go b/pkg/volume/downwardapi/downwardapi_test.go index b0edd400f7..4f68da8456 100644 --- a/pkg/volume/downwardapi/downwardapi_test.go +++ b/pkg/volume/downwardapi/downwardapi_test.go @@ -54,7 +54,7 @@ func TestCanSupport(t *testing.T) { pluginMgr := volume.VolumePluginMgr{} tmpDir, host := newTestHost(t, nil) defer os.RemoveAll(tmpDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(downwardAPIPluginName) if err != nil { @@ -219,7 +219,7 @@ func newDownwardAPITest(t *testing.T, name string, volumeFiles, podLabels, podAn pluginMgr := volume.VolumePluginMgr{} rootDir, host := newTestHost(t, clientset) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(downwardAPIPluginName) if err != nil { t.Errorf("Can't find the plugin by name") diff --git a/pkg/volume/empty_dir/empty_dir_test.go b/pkg/volume/empty_dir/empty_dir_test.go index 642c49422c..31cc7f5a31 100644 --- a/pkg/volume/empty_dir/empty_dir_test.go +++ b/pkg/volume/empty_dir/empty_dir_test.go @@ -36,7 +36,7 @@ import ( // Construct an instance of a plugin, by name. func makePluginUnderTest(t *testing.T, plugName, basePath string) volume.VolumePlugin { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(basePath, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(basePath, nil, nil)) plug, err := plugMgr.FindPluginByName(plugName) if err != nil { diff --git a/pkg/volume/fc/fc_test.go b/pkg/volume/fc/fc_test.go index 5e6ab7f9fa..b800694281 100644 --- a/pkg/volume/fc/fc_test.go +++ b/pkg/volume/fc/fc_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/fc") if err != nil { @@ -61,7 +61,7 @@ func TestGetAccessModes(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/fc") if err != nil { @@ -132,7 +132,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/fc") if err != nil { @@ -202,7 +202,7 @@ func doTestPluginNilMounter(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/fc") if err != nil { @@ -355,7 +355,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(fcPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/flexvolume/BUILD b/pkg/volume/flexvolume/BUILD index b590ed98ac..02f3602940 100644 --- a/pkg/volume/flexvolume/BUILD +++ b/pkg/volume/flexvolume/BUILD @@ -29,9 +29,11 @@ go_library( "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", + "//vendor/github.com/fsnotify/fsnotify:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], ) diff --git a/pkg/volume/flexvolume/flexvolume_test.go b/pkg/volume/flexvolume/flexvolume_test.go index 76ade7b854..292beb5450 100644 --- a/pkg/volume/flexvolume/flexvolume_test.go +++ b/pkg/volume/flexvolume/flexvolume_test.go @@ -174,7 +174,7 @@ func TestCanSupport(t *testing.T) { plugMgr := volume.VolumePluginMgr{} installPluginUnderTest(t, "kubernetes.io", "fakeAttacher", tmpDir, execScriptTempl1, nil) - plugMgr.InitPlugins(ProbeVolumePlugins(tmpDir), volumetest.NewFakeVolumeHost("fake", nil, nil)) + plugMgr.InitPlugins(nil, GetDynamicPluginProber(tmpDir), volumetest.NewFakeVolumeHost("fake", nil, nil)) plugin, err := plugMgr.FindPluginByName("flexvolume-kubernetes.io/fakeAttacher") if err != nil { t.Fatalf("Can't find the plugin by name") @@ -202,7 +202,7 @@ func TestGetAccessModes(t *testing.T) { plugMgr := volume.VolumePluginMgr{} installPluginUnderTest(t, "kubernetes.io", "fakeAttacher", tmpDir, execScriptTempl1, nil) - plugMgr.InitPlugins(ProbeVolumePlugins(tmpDir), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(nil, GetDynamicPluginProber(tmpDir), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plugin, err := plugMgr.FindPersistentPluginByName("flexvolume-kubernetes.io/fakeAttacher") if err != nil { diff --git a/pkg/volume/flexvolume/probe.go b/pkg/volume/flexvolume/probe.go index 431c340a22..4acdb9f932 100644 --- a/pkg/volume/flexvolume/probe.go +++ b/pkg/volume/flexvolume/probe.go @@ -19,28 +19,217 @@ package flexvolume import ( "io/ioutil" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/volume" + + "os" + + "fmt" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "k8s.io/apimachinery/pkg/util/errors" ) -// This is the primary entrypoint for volume plugins. -func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { - plugins := []volume.VolumePlugin{} +type flexVolumeProber struct { + mutex sync.Mutex + pluginDir string // Flexvolume driver directory + watcher *fsnotify.Watcher + probeNeeded bool // Must only read and write this through testAndSetProbeNeeded. + lastUpdated time.Time // Last time probeNeeded was updated. + watchEventCount int +} - files, _ := ioutil.ReadDir(pluginDir) +const ( + // TODO (cxing) Tune these params based on test results. + // watchEventLimit is the max allowable number of processed watches within watchEventInterval. + watchEventInterval = 5 * time.Second + watchEventLimit = 20 +) + +func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber { + return &flexVolumeProber{pluginDir: pluginDir} +} + +func (prober *flexVolumeProber) Init() error { + prober.testAndSetProbeNeeded(true) + prober.lastUpdated = time.Now() + + if err := prober.createPluginDir(); err != nil { + return err + } + if err := prober.initWatcher(); err != nil { + return err + } + + go func() { + defer prober.watcher.Close() + for { + select { + case event := <-prober.watcher.Events: + if err := prober.handleWatchEvent(event); err != nil { + glog.Errorf("Flexvolume prober watch: %s", err) + } + case err := <-prober.watcher.Errors: + glog.Errorf("Received an error from watcher: %s", err) + } + } + }() + + return nil +} + +// Probes for Flexvolume drivers. +// If a filesystem update has occurred since the last probe, updated = true +// and the list of probed plugins is returned. +// Otherwise, update = false and probedPlugins = nil. +// +// If an error occurs, updated and plugins are set arbitrarily. +func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePlugin, err error) { + probeNeeded := prober.testAndSetProbeNeeded(false) + + if !probeNeeded { + return false, nil, nil + } + + files, err := ioutil.ReadDir(prober.pluginDir) + if err != nil { + return false, nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err) + } + + plugins = []volume.VolumePlugin{} + allErrs := []error{} for _, f := range files { - // only directories are counted as plugins + // only directories with names that do not begin with '.' are counted as plugins // and pluginDir/dirname/dirname should be an executable // unless dirname contains '~' for escaping namespace // e.g. dirname = vendor~cifs // then, executable will be pluginDir/dirname/cifs - if f.IsDir() { - plugin, err := NewFlexVolumePlugin(pluginDir, f.Name()) - if err != nil { + if f.IsDir() && filepath.Base(f.Name())[0] != '.' { + plugin, pluginErr := NewFlexVolumePlugin(prober.pluginDir, f.Name()) + if pluginErr != nil { + pluginErr = fmt.Errorf( + "Error creating Flexvolume plugin from directory %s, skipping. Error: %s", + f.Name(), pluginErr) + allErrs = append(allErrs, pluginErr) continue } plugins = append(plugins, plugin) } } - return plugins + + return true, plugins, errors.NewAggregate(allErrs) +} + +func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error { + // event.Name is the watched path. + if filepath.Base(event.Name)[0] == '.' { + // Ignore files beginning with '.' + return nil + } + + eventPathAbs, err := filepath.Abs(event.Name) + if err != nil { + return err + } + + pluginDirAbs, err := filepath.Abs(prober.pluginDir) + if err != nil { + return err + } + + // If the Flexvolume plugin directory is removed, need to recreate it + // in order to keep it under watch. + if eventOpIs(event, fsnotify.Remove) && eventPathAbs == pluginDirAbs { + glog.Warningf("Flexvolume plugin directory at %s is removed. Recreating.", pluginDirAbs) + if err := prober.createPluginDir(); err != nil { + return err + } + if err := prober.addWatchRecursive(pluginDirAbs); err != nil { + return err + } + } else if eventOpIs(event, fsnotify.Create) { + if err := prober.addWatchRecursive(eventPathAbs); err != nil { + return err + } + } + + prober.updateProbeNeeded() + + return nil +} + +func (prober *flexVolumeProber) updateProbeNeeded() { + // Within 'watchEventInterval' seconds, a max of 'watchEventLimit' watch events is processed. + // The watch event will not be registered if the limit is reached. + // This prevents increased disk usage from Probe() being triggered too frequently (either + // accidentally or maliciously). + if time.Since(prober.lastUpdated) > watchEventInterval { + // Update, then reset the timer and watch count. + prober.testAndSetProbeNeeded(true) + prober.lastUpdated = time.Now() + prober.watchEventCount = 1 + } else if prober.watchEventCount < watchEventLimit { + prober.testAndSetProbeNeeded(true) + prober.watchEventCount++ + } +} + +// Recursively adds to watch all directories inside and including the file specified by the given filename. +// If the file is a symlink to a directory, it will watch the symlink but not any of the subdirectories. +// +// Each file or directory change triggers two events: one from the watch on itself, another from the watch +// on its parent directory. +func (prober *flexVolumeProber) addWatchRecursive(filename string) error { + addWatch := func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + if err := prober.watcher.Add(path); err != nil { + glog.Errorf("Error recursively adding watch: %v", err) + } + } + return nil + } + + return filepath.Walk(filename, addWatch) +} + +// Creates a new filesystem watcher and adds watches for the plugin directory +// and all of its subdirectories. +func (prober *flexVolumeProber) initWatcher() error { + var err error + if prober.watcher, err = fsnotify.NewWatcher(); err != nil { + return fmt.Errorf("Error creating new watcher: %s", err) + } + + if err = prober.addWatchRecursive(prober.pluginDir); err != nil { + return fmt.Errorf("Error adding watch on Flexvolume directory: %s", err) + } + + return nil +} + +// Creates the plugin directory, if it doesn't already exist. +func (prober *flexVolumeProber) createPluginDir() error { + if _, err := os.Stat(prober.pluginDir); os.IsNotExist(err) { + err := os.MkdirAll(prober.pluginDir, 0755) + if err != nil { + return fmt.Errorf("Error (re-)creating driver directory: %s", err) + } + } + + return nil +} + +func (prober *flexVolumeProber) testAndSetProbeNeeded(newval bool) (oldval bool) { + prober.mutex.Lock() + defer prober.mutex.Unlock() + oldval, prober.probeNeeded = prober.probeNeeded, newval + return +} + +func eventOpIs(event fsnotify.Event, op fsnotify.Op) bool { + return event.Op&op == op } diff --git a/pkg/volume/flocker/flocker_test.go b/pkg/volume/flocker/flocker_test.go index 9ffba24bef..78930d8945 100644 --- a/pkg/volume/flocker/flocker_test.go +++ b/pkg/volume/flocker/flocker_test.go @@ -127,7 +127,7 @@ func newInitializedVolumePlugMgr(t *testing.T) (*volume.VolumePluginMgr, string) plugMgr := &volume.VolumePluginMgr{} dir, err := utiltesting.MkTmpdir("flocker") assert.NoError(t, err) - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(dir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(dir, nil, nil)) return plugMgr, dir } @@ -138,7 +138,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/flocker") if err != nil { diff --git a/pkg/volume/flocker/flocker_volume_test.go b/pkg/volume/flocker/flocker_volume_test.go index 8abc62c82e..7f1a2e3071 100644 --- a/pkg/volume/flocker/flocker_volume_test.go +++ b/pkg/volume/flocker/flocker_volume_test.go @@ -35,7 +35,7 @@ func newTestableProvisioner(assert *assert.Assertions, options volume.VolumeOpti assert.NoError(err, fmt.Sprintf("can't make a temp dir: %v", err)) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName(pluginName) assert.NoError(err, "Can't find the plugin by name") diff --git a/pkg/volume/gce_pd/attacher_test.go b/pkg/volume/gce_pd/attacher_test.go index 84de7c55b4..cccb876376 100644 --- a/pkg/volume/gce_pd/attacher_test.go +++ b/pkg/volume/gce_pd/attacher_test.go @@ -202,7 +202,8 @@ func newPlugin() *gcePersistentDiskPlugin { host := volumetest.NewFakeVolumeHost( "/tmp", /* rootDir */ nil, /* kubeClient */ - nil /* plugins */) + nil, /* plugins */ + ) plugins := ProbeVolumePlugins() plugin := plugins[0] plugin.Init(host) diff --git a/pkg/volume/gce_pd/gce_pd_test.go b/pkg/volume/gce_pd/gce_pd_test.go index 4dc1761ec7..285e7b5faf 100644 --- a/pkg/volume/gce_pd/gce_pd_test.go +++ b/pkg/volume/gce_pd/gce_pd_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") if err != nil { @@ -63,7 +63,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/gce-pd") if err != nil { @@ -106,7 +106,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") if err != nil { @@ -244,7 +244,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(gcePersistentDiskPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/git_repo/git_repo_test.go b/pkg/volume/git_repo/git_repo_test.go index e851e43c47..1bee3ad218 100644 --- a/pkg/volume/git_repo/git_repo_test.go +++ b/pkg/volume/git_repo/git_repo_test.go @@ -47,7 +47,7 @@ func TestCanSupport(t *testing.T) { plugMgr := volume.VolumePluginMgr{} tempDir, host := newTestHost(t) defer os.RemoveAll(tempDir) - plugMgr.InitPlugins(ProbeVolumePlugins(), host) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plug, err := plugMgr.FindPluginByName("kubernetes.io/git-repo") if err != nil { @@ -225,7 +225,7 @@ func doTestPlugin(scenario struct { plugMgr := volume.VolumePluginMgr{} rootDir, host := newTestHost(t) defer os.RemoveAll(rootDir) - plugMgr.InitPlugins(ProbeVolumePlugins(), host) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plug, err := plugMgr.FindPluginByName("kubernetes.io/git-repo") if err != nil { diff --git a/pkg/volume/glusterfs/glusterfs_test.go b/pkg/volume/glusterfs/glusterfs_test.go index 2f4f66adff..ba8a2fb372 100644 --- a/pkg/volume/glusterfs/glusterfs_test.go +++ b/pkg/volume/glusterfs/glusterfs_test.go @@ -43,7 +43,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/glusterfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -67,7 +67,7 @@ func TestGetAccessModes(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/glusterfs") if err != nil { @@ -95,7 +95,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/glusterfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -213,7 +213,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim, ep) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(glusterfsPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 8d78b99d0f..1184f5d35a 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -51,7 +51,7 @@ func newHostPathTypeList(pathType ...string) []*v1.HostPathType { func TestCanSupport(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") if err != nil { @@ -73,7 +73,7 @@ func TestCanSupport(t *testing.T) { func TestGetAccessModes(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/host-path") if err != nil { @@ -87,7 +87,7 @@ func TestGetAccessModes(t *testing.T) { func TestRecycler(t *testing.T) { plugMgr := volume.VolumePluginMgr{} pluginHost := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil) - plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.VolumeConfig{}}}, pluginHost) + plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.VolumeConfig{}}}, nil, pluginHost) spec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/foo"}}}}} _, err := plugMgr.FindRecyclablePluginBySpec(spec) @@ -106,7 +106,7 @@ func TestDeleter(t *testing.T) { } plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) spec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{Path: tempPath}}}}} plug, err := plugMgr.FindDeletablePluginBySpec(spec) @@ -140,7 +140,7 @@ func TestDeleterTempDir(t *testing.T) { for name, test := range tests { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) spec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{Path: test.path}}}}} plug, _ := plugMgr.FindDeletablePluginBySpec(spec) deleter, _ := plug.NewDeleter(spec) @@ -161,6 +161,7 @@ func TestProvisioner(t *testing.T) { plugMgr := volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{ProvisioningEnabled: true}), + nil, volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) spec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{Path: tempPath}}}}} plug, err := plugMgr.FindCreatablePluginBySpec(spec) @@ -199,7 +200,7 @@ func TestProvisioner(t *testing.T) { func TestInvalidHostPath(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("fake", nil, nil)) plug, err := plugMgr.FindPluginByName(hostPathPluginName) if err != nil { @@ -224,7 +225,7 @@ func TestInvalidHostPath(t *testing.T) { func TestPlugin(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("fake", nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") if err != nil { @@ -300,7 +301,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost("/tmp/fake", client, nil)) plug, _ := plugMgr.FindPluginByName(hostPathPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/iscsi/iscsi_test.go b/pkg/volume/iscsi/iscsi_test.go index d9499bf532..ebdb67adc2 100644 --- a/pkg/volume/iscsi/iscsi_test.go +++ b/pkg/volume/iscsi/iscsi_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/iscsi") if err != nil { @@ -61,7 +61,7 @@ func TestGetAccessModes(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/iscsi") if err != nil { @@ -132,7 +132,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/iscsi") if err != nil { @@ -276,7 +276,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(iscsiPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/local/local_test.go b/pkg/volume/local/local_test.go index 972da115ec..87fb829c49 100644 --- a/pkg/volume/local/local_test.go +++ b/pkg/volume/local/local_test.go @@ -44,7 +44,7 @@ func getPlugin(t *testing.T) (string, volume.VolumePlugin) { } plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName(localVolumePluginName) if err != nil { @@ -64,7 +64,7 @@ func getPersistentPlugin(t *testing.T) (string, volume.PersistentVolumePlugin) { } plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName(localVolumePluginName) if err != nil { @@ -337,7 +337,7 @@ func TestUnsupportedPlugins(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) spec := getTestVolume(false, tmpDir) recyclePlug, err := plugMgr.FindRecyclablePluginBySpec(spec) diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 3b086b44b6..2cb83235b5 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/nfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -67,7 +67,7 @@ func TestGetAccessModes(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/nfs") if err != nil { @@ -86,7 +86,7 @@ func TestRecycler(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, volume.VolumeConfig{}}}, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, volume.VolumeConfig{}}}, nil, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) spec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{NFS: &v1.NFSVolumeSource{Path: "/foo"}}}}} _, plugin_err := plugMgr.FindRecyclablePluginBySpec(spec) @@ -112,7 +112,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/nfs") if err != nil { t.Errorf("Can't find the plugin by name") @@ -240,7 +240,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(volume.VolumeConfig{}), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(nfsPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/photon_pd/photon_pd_test.go b/pkg/volume/photon_pd/photon_pd_test.go index b874c5b0da..085ca2408e 100644 --- a/pkg/volume/photon_pd/photon_pd_test.go +++ b/pkg/volume/photon_pd/photon_pd_test.go @@ -37,7 +37,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/photon-pd") if err != nil { @@ -61,7 +61,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/photon-pd") if err != nil { @@ -106,7 +106,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/photon-pd") if err != nil { @@ -211,7 +211,7 @@ func TestMounterAndUnmounterTypeAssert(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/photon-pd") if err != nil { diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 7ba0d3ec67..dccb99fce7 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -70,6 +70,17 @@ type VolumeOptions struct { Containerized bool } +type DynamicPluginProber interface { + Init() error + + // If an update has occurred since the last probe, updated = true + // and the list of probed plugins is returned. + // Otherwise, update = false and probedPlugins = nil. + // + // If an error occurs, updated and probedPlugins are undefined. + Probe() (updated bool, probedPlugins []VolumePlugin, err error) +} + // VolumePlugin is an interface to volume plugins that can be used on a // kubernetes node (e.g. by kubelet) to instantiate and manage volumes. type VolumePlugin interface { @@ -255,9 +266,11 @@ type VolumeHost interface { // VolumePluginMgr tracks registered plugins. type VolumePluginMgr struct { - mutex sync.Mutex - plugins map[string]VolumePlugin - Host VolumeHost + mutex sync.Mutex + plugins map[string]VolumePlugin + prober DynamicPluginProber + probedPlugins []VolumePlugin + Host VolumeHost } // Spec is an internal representation of a volume. All API volume types translate to Spec. @@ -352,11 +365,24 @@ func NewSpecFromPersistentVolume(pv *v1.PersistentVolume, readOnly bool) *Spec { // InitPlugins initializes each plugin. All plugins must have unique names. // This must be called exactly once before any New* methods are called on any // plugins. -func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) error { +func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error { pm.mutex.Lock() defer pm.mutex.Unlock() pm.Host = host + + if prober == nil { + // Use a dummy prober to prevent nil deference. + pm.prober = &dummyPluginProber{} + } else { + pm.prober = prober + } + if err := pm.prober.Init(); err != nil { + // Prober init failure should not affect the initialization of other plugins. + glog.Errorf("Error initializing dynamic plugin prober: %s", err) + pm.prober = &dummyPluginProber{} + } + if pm.plugins == nil { pm.plugins = map[string]VolumePlugin{} } @@ -385,6 +411,21 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) return utilerrors.NewAggregate(allErrs) } +func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error { + name := probedPlugin.GetPluginName() + if errs := validation.IsQualifiedName(name); len(errs) != 0 { + return fmt.Errorf("volume plugin has invalid name: %q: %s", name, strings.Join(errs, ";")) + } + + err := probedPlugin.Init(pm.Host) + if err != nil { + return fmt.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error()) + } + + glog.V(1).Infof("Loaded volume plugin %q", name) + return nil +} + // FindPluginBySpec looks for a plugin that can support a given volume // specification. If no plugins can support or more than one plugin can // support it, return error. @@ -396,19 +437,30 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) { return nil, fmt.Errorf("Could not find plugin because volume spec is nil") } - matches := []string{} + matchedPluginNames := []string{} + matches := []VolumePlugin{} for k, v := range pm.plugins { if v.CanSupport(spec) { - matches = append(matches, k) + matchedPluginNames = append(matchedPluginNames, k) + matches = append(matches, v) } } + + pm.refreshProbedPlugins() + for _, plugin := range pm.probedPlugins { + if plugin.CanSupport(spec) { + matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName()) + matches = append(matches, plugin) + } + } + if len(matches) == 0 { return nil, fmt.Errorf("no volume plugin matched") } if len(matches) > 1 { - return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ",")) + return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ",")) } - return pm.plugins[matches[0]], nil + return matches[0], nil } // FindPluginByName fetches a plugin by name or by legacy name. If no plugin @@ -418,19 +470,52 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) { defer pm.mutex.Unlock() // Once we can get rid of legacy names we can reduce this to a map lookup. - matches := []string{} + matchedPluginNames := []string{} + matches := []VolumePlugin{} for k, v := range pm.plugins { if v.GetPluginName() == name { - matches = append(matches, k) + matchedPluginNames = append(matchedPluginNames, k) + matches = append(matches, v) } } + + pm.refreshProbedPlugins() + for _, plugin := range pm.probedPlugins { + if plugin.GetPluginName() == name { + matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName()) + matches = append(matches, plugin) + } + } + if len(matches) == 0 { return nil, fmt.Errorf("no volume plugin matched") } if len(matches) > 1 { - return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ",")) + return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ",")) + } + return matches[0], nil +} + +// Check if probedPlugin cache update is required. +// If it is, initialize all probed plugins and replace the cache with them. +func (pm *VolumePluginMgr) refreshProbedPlugins() { + updated, plugins, err := pm.prober.Probe() + if err != nil { + glog.Errorf("Error dynamically probing plugins: %s", err) + return // Use cached plugins upon failure. + } + + if updated { + pm.probedPlugins = []VolumePlugin{} + for _, plugin := range plugins { + if err := pm.initProbedPlugin(plugin); err != nil { + glog.Errorf("Error initializing dynamically probed plugin %s; error: %s", + plugin.GetPluginName(), err) + continue + } + pm.probedPlugins = append(pm.probedPlugins, plugin) + } } - return pm.plugins[matches[0]], nil } // FindPersistentPluginBySpec looks for a persistent volume plugin that can @@ -618,3 +703,8 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error { } return nil } + +type dummyPluginProber struct{} + +func (*dummyPluginProber) Init() error { return nil } +func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil } diff --git a/pkg/volume/plugins_test.go b/pkg/volume/plugins_test.go index fc319e1027..b8d74a26fa 100644 --- a/pkg/volume/plugins_test.go +++ b/pkg/volume/plugins_test.go @@ -103,7 +103,8 @@ func newTestPlugin() []VolumePlugin { func TestVolumePluginMgrFunc(t *testing.T) { vpm := VolumePluginMgr{} - vpm.InitPlugins(newTestPlugin(), nil) + var prober DynamicPluginProber = nil // TODO (#51147) inject mock + vpm.InitPlugins(newTestPlugin(), prober, nil) plug, err := vpm.FindPluginByName("testPlugin") if err != nil { diff --git a/pkg/volume/portworx/portworx_test.go b/pkg/volume/portworx/portworx_test.go index 195e6c7847..e14ef82502 100644 --- a/pkg/volume/portworx/portworx_test.go +++ b/pkg/volume/portworx/portworx_test.go @@ -41,7 +41,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/portworx-volume") if err != nil { @@ -65,7 +65,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/portworx-volume") if err != nil { @@ -135,7 +135,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/portworx-volume") if err != nil { diff --git a/pkg/volume/projected/projected_test.go b/pkg/volume/projected/projected_test.go index 3c65ce1107..6b82586523 100644 --- a/pkg/volume/projected/projected_test.go +++ b/pkg/volume/projected/projected_test.go @@ -670,7 +670,7 @@ func TestCanSupport(t *testing.T) { pluginMgr := volume.VolumePluginMgr{} tempDir, host := newTestHost(t, nil) defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(projectedPluginName) if err != nil { @@ -701,7 +701,7 @@ func TestPlugin(t *testing.T) { rootDir, host = newTestHost(t, client) ) defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(projectedPluginName) if err != nil { @@ -765,7 +765,7 @@ func TestPluginReboot(t *testing.T) { rootDir, host = newTestHost(t, client) ) defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(projectedPluginName) if err != nil { @@ -819,7 +819,7 @@ func TestPluginOptional(t *testing.T) { ) volumeSpec.VolumeSource.Projected.Sources[0].Secret.Optional = &trueVal defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(projectedPluginName) if err != nil { @@ -896,7 +896,7 @@ func TestPluginOptionalKeys(t *testing.T) { } volumeSpec.VolumeSource.Projected.Sources[0].Secret.Optional = &trueVal defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(projectedPluginName) if err != nil { diff --git a/pkg/volume/quobyte/quobyte_test.go b/pkg/volume/quobyte/quobyte_test.go index 86fb2fd092..1ae60e4078 100644 --- a/pkg/volume/quobyte/quobyte_test.go +++ b/pkg/volume/quobyte/quobyte_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/quobyte") if err != nil { t.Errorf("Can't find the plugin by name") @@ -63,7 +63,7 @@ func TestGetAccessModes(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/quobyte") if err != nil { @@ -91,7 +91,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/quobyte") if err != nil { t.Errorf("Can't find the plugin by name") @@ -187,7 +187,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(quobytePluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index 2cabe4d30f..ba27d570c3 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd") if err != nil { @@ -104,7 +104,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd") if err != nil { @@ -229,7 +229,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(rbdPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/scaleio/sio_volume_test.go b/pkg/volume/scaleio/sio_volume_test.go index 36e747c31f..b83be41828 100644 --- a/pkg/volume/scaleio/sio_volume_test.go +++ b/pkg/volume/scaleio/sio_volume_test.go @@ -63,7 +63,7 @@ func newPluginMgr(t *testing.T) (*volume.VolumePluginMgr, string) { fakeClient := fakeclient.NewSimpleClientset(config) host := volumetest.NewFakeVolumeHost(tmpDir, fakeClient, nil) plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), host) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) return plugMgr, tmpDir } diff --git a/pkg/volume/secret/secret_test.go b/pkg/volume/secret/secret_test.go index 438442a4c7..6ba6a10d4b 100644 --- a/pkg/volume/secret/secret_test.go +++ b/pkg/volume/secret/secret_test.go @@ -275,7 +275,7 @@ func TestCanSupport(t *testing.T) { pluginMgr := volume.VolumePluginMgr{} tempDir, host := newTestHost(t, nil) defer os.RemoveAll(tempDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(secretPluginName) if err != nil { @@ -306,7 +306,7 @@ func TestPlugin(t *testing.T) { rootDir, host = newTestHost(t, client) ) defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(secretPluginName) if err != nil { @@ -379,7 +379,7 @@ func TestPluginReboot(t *testing.T) { rootDir, host = newTestHost(t, client) ) defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(secretPluginName) if err != nil { @@ -433,7 +433,7 @@ func TestPluginOptional(t *testing.T) { ) volumeSpec.Secret.Optional = &trueVal defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(secretPluginName) if err != nil { @@ -510,7 +510,7 @@ func TestPluginOptionalKeys(t *testing.T) { } volumeSpec.Secret.Optional = &trueVal defer os.RemoveAll(rootDir) - pluginMgr.InitPlugins(ProbeVolumePlugins(), host) + pluginMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) plugin, err := pluginMgr.FindPluginByName(secretPluginName) if err != nil { diff --git a/pkg/volume/storageos/storageos_test.go b/pkg/volume/storageos/storageos_test.go index 37d6cc71f7..8f1f29887b 100644 --- a/pkg/volume/storageos/storageos_test.go +++ b/pkg/volume/storageos/storageos_test.go @@ -39,7 +39,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/storageos") if err != nil { @@ -63,7 +63,7 @@ func TestGetAccessModes(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/storageos") if err != nil { @@ -138,7 +138,7 @@ func TestPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/storageos") if err != nil { @@ -353,7 +353,7 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { client := fake.NewSimpleClientset(pv, claim) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, client, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, client, nil)) plug, _ := plugMgr.FindPluginByName(storageosPluginName) // readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes diff --git a/pkg/volume/storageos/storageos_util_test.go b/pkg/volume/storageos/storageos_util_test.go index 99897e7a03..3eee51231e 100644 --- a/pkg/volume/storageos/storageos_util_test.go +++ b/pkg/volume/storageos/storageos_util_test.go @@ -117,7 +117,7 @@ func TestCreateVolume(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, _ := plugMgr.FindPluginByName("kubernetes.io/storageos") // Use real util with stubbed api @@ -209,7 +209,7 @@ func TestAttachVolume(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, _ := plugMgr.FindPluginByName("kubernetes.io/storageos") // Use real util with stubbed api diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 902416c8a3..435f5683a7 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -73,7 +73,7 @@ func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins [ host.mounter = &mount.FakeMounter{} host.writer = &io.StdWriter{} host.exec = mount.NewFakeExec(nil) - host.pluginMgr.InitPlugins(plugins, host) + host.pluginMgr.InitPlugins(plugins, nil /* prober */, host) return host } @@ -768,7 +768,7 @@ func GetTestVolumePluginMgr( nil, /* plugins */ ) plugins := ProbeVolumePlugins(VolumeConfig{}) - if err := v.pluginMgr.InitPlugins(plugins, v); err != nil { + if err := v.pluginMgr.InitPlugins(plugins, nil /* prober */, v); err != nil { t.Fatal(err) } diff --git a/pkg/volume/vsphere_volume/vsphere_volume_test.go b/pkg/volume/vsphere_volume/vsphere_volume_test.go index 992bc61275..282cb23d60 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume_test.go +++ b/pkg/volume/vsphere_volume/vsphere_volume_test.go @@ -37,7 +37,7 @@ func TestCanSupport(t *testing.T) { } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/vsphere-volume") if err != nil { @@ -90,7 +90,7 @@ func TestPlugin(t *testing.T) { defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/vsphere-volume") if err != nil { diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 417a29032a..a5b1fe11fc 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -361,6 +361,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers.Core().V1().PersistentVolumes(), cloud, plugins, + nil, /* prober */ false, 5*time.Second, timers)