Merge pull request #51474 from verult/ProberTest

Automatic merge from submit-queue (batch tested with PRs 51805, 51725, 50925, 51474, 51638)

Flexvolume dynamic plugin discovery: Prober unit tests and basic e2e test.

**What this PR does / why we need it**: Tests for changes introduced in PR #50031 .
As part of the prober unit test, I mocked filesystem, filesystem watch, and Flexvolume plugin initialization.
Moved the filesystem event goroutine to watcher implementation.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #51147

**Special notes for your reviewer**:
First commit contains added functionality of the mock filesystem.
Second commit is the refactor for moving mock filesystem into a common util directory.
Third commit is the unit and e2e tests.

**Release note**:

```release-note
NONE
```
/release-note-none
/sig storage
/assign @saad-ali @liggitt 
/cc @mtaufen @chakri-nelluri @wongma7
pull/6/head
Kubernetes Submit Queue 2017-09-03 11:10:05 -07:00 committed by GitHub
commit f07279ada2
23 changed files with 548 additions and 57 deletions

View File

@ -21,9 +21,9 @@ go_library(
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/equal:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
@ -52,7 +52,6 @@ filegroup(
"//pkg/kubelet/kubeletconfig/util/codec:all-srcs",
"//pkg/kubelet/kubeletconfig/util/equal:all-srcs",
"//pkg/kubelet/kubeletconfig/util/files:all-srcs",
"//pkg/kubelet/kubeletconfig/util/filesystem:all-srcs",
"//pkg/kubelet/kubeletconfig/util/log:all-srcs",
"//pkg/kubelet/kubeletconfig/util/panic:all-srcs",
"//pkg/kubelet/kubeletconfig/util/test:all-srcs",

View File

@ -16,8 +16,8 @@ go_test(
deps = [
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -35,8 +35,8 @@ go_library(
deps = [
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)

View File

@ -23,8 +23,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (

View File

@ -29,8 +29,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const testCheckpointsDir = "/test-checkpoints-dir"

View File

@ -13,7 +13,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
],
)
@ -40,8 +40,8 @@ go_test(
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1alpha1:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
],

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const kubeletFile = "kubelet"

View File

@ -29,8 +29,8 @@ import (
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1alpha1"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
func addFile(fs utilfs.Filesystem, path string, file string) error {

View File

@ -31,9 +31,9 @@ import (
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (

View File

@ -8,7 +8,7 @@ load(
go_library(
name = "go_default_library",
srcs = ["files.go"],
deps = ["//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library"],
deps = ["//pkg/util/filesystem:go_default_library"],
)
filegroup(

View File

@ -21,7 +21,7 @@ import (
"os"
"path/filepath"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const defaultPerm = 0666

View File

@ -19,6 +19,7 @@ filegroup(
"//pkg/util/ebtables:all-srcs",
"//pkg/util/env:all-srcs",
"//pkg/util/file:all-srcs",
"//pkg/util/filesystem:all-srcs",
"//pkg/util/flock:all-srcs",
"//pkg/util/goroutinemap:all-srcs",
"//pkg/util/hash:all-srcs",

View File

@ -11,8 +11,12 @@ go_library(
"defaultfs.go",
"fakefs.go",
"filesystem.go",
"watcher.go",
],
deps = [
"//vendor/github.com/fsnotify/fsnotify:go_default_library",
"//vendor/github.com/spf13/afero:go_default_library",
],
deps = ["//vendor/github.com/spf13/afero:go_default_library"],
)
filegroup(

View File

@ -19,12 +19,15 @@ package filesystem
import (
"io/ioutil"
"os"
"path/filepath"
"time"
)
// DefaultFs implements Filesystem using same-named functions from "os" and "io/ioutil"
type DefaultFs struct{}
var _ Filesystem = DefaultFs{}
// Stat via os.Stat
func (DefaultFs) Stat(name string) (os.FileInfo, error) {
return os.Stat(name)
@ -54,12 +57,17 @@ func (DefaultFs) Chtimes(name string, atime time.Time, mtime time.Time) error {
return os.Chtimes(name, atime, mtime)
}
// ReadFile via os.ReadFile
// RemoveAll via os.RemoveAll
func (DefaultFs) RemoveAll(path string) error {
return os.RemoveAll(path)
}
// ReadFile via ioutil.ReadFile
func (DefaultFs) ReadFile(filename string) ([]byte, error) {
return ioutil.ReadFile(filename)
}
// TempFile via os.TempFile
// TempFile via ioutil.TempFile
func (DefaultFs) TempFile(dir, prefix string) (File, error) {
file, err := ioutil.TempFile(dir, prefix)
if err != nil {
@ -68,6 +76,16 @@ func (DefaultFs) TempFile(dir, prefix string) (File, error) {
return &defaultFile{file}, nil
}
// ReadDir via ioutil.ReadDir
func (DefaultFs) ReadDir(dirname string) ([]os.FileInfo, error) {
return ioutil.ReadDir(dirname)
}
// Walk via filepath.Walk
func (DefaultFs) Walk(root string, walkFn filepath.WalkFunc) error {
return filepath.Walk(root, walkFn)
}
// defaultFile implements File using same-named functions from "os"
type defaultFile struct {
file *os.File

View File

@ -18,6 +18,7 @@ package filesystem
import (
"os"
"path/filepath"
"time"
"github.com/spf13/afero"
@ -62,12 +63,12 @@ func (fs *fakeFs) Chtimes(name string, atime time.Time, mtime time.Time) error {
return fs.a.Fs.Chtimes(name, atime, mtime)
}
// ReadFile via afero.Fs.ReadFile
// ReadFile via afero.ReadFile
func (fs *fakeFs) ReadFile(filename string) ([]byte, error) {
return fs.a.ReadFile(filename)
}
// TempFile via afero.Fs.TempFile
// TempFile via afero.TempFile
func (fs *fakeFs) TempFile(dir, prefix string) (File, error) {
file, err := fs.a.TempFile(dir, prefix)
if err != nil {
@ -76,6 +77,21 @@ func (fs *fakeFs) TempFile(dir, prefix string) (File, error) {
return &fakeFile{file}, nil
}
// ReadDir via afero.ReadDir
func (fs *fakeFs) ReadDir(dirname string) ([]os.FileInfo, error) {
return fs.a.ReadDir(dirname)
}
// Walk via afero.Walk
func (fs *fakeFs) Walk(root string, walkFn filepath.WalkFunc) error {
return fs.a.Walk(root, walkFn)
}
// RemoveAll via afero.RemoveAll
func (fs *fakeFs) RemoveAll(path string) error {
return fs.a.RemoveAll(path)
}
// fakeFile implements File; for use with fakeFs
type fakeFile struct {
file afero.File

View File

@ -18,6 +18,7 @@ package filesystem
import (
"os"
"path/filepath"
"time"
)
@ -29,10 +30,13 @@ type Filesystem interface {
Rename(oldpath, newpath string) error
MkdirAll(path string, perm os.FileMode) error
Chtimes(name string, atime time.Time, mtime time.Time) error
RemoveAll(path string) error
// from "io/ioutil"
ReadFile(filename string) ([]byte, error)
TempFile(dir, prefix string) (File, error)
ReadDir(dirname string) ([]os.FileInfo, error)
Walk(root string, walkFn filepath.WalkFunc) error
}
// File is an interface that we can use to mock various filesystem operations typically

View File

@ -0,0 +1,89 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filesystem
import (
"github.com/fsnotify/fsnotify"
)
// FSWatcher is a callback-based filesystem watcher abstraction for fsnotify.
type FSWatcher interface {
// Initializes the watcher with the given watch handlers.
// Called before all other methods.
Init(FSEventHandler, FSErrorHandler) error
// Starts listening for events and errors.
// When an event or error occurs, the corresponding handler is called.
Run()
// Add a filesystem path to watch
AddWatch(path string) error
}
// FSEventHandler is called when a fsnotify event occurs.
type FSEventHandler func(event fsnotify.Event)
// FSErrorHandler is called when a fsnotify error occurs.
type FSErrorHandler func(err error)
type fsnotifyWatcher struct {
watcher *fsnotify.Watcher
eventHandler FSEventHandler
errorHandler FSErrorHandler
}
var _ FSWatcher = &fsnotifyWatcher{}
// NewFsnotifyWatcher returns an implementation of FSWatcher that continuously listens for
// fsnotify events and calls the event handler as soon as an event is received.
func NewFsnotifyWatcher() FSWatcher {
return &fsnotifyWatcher{}
}
func (w *fsnotifyWatcher) AddWatch(path string) error {
return w.watcher.Add(path)
}
func (w *fsnotifyWatcher) Init(eventHandler FSEventHandler, errorHandler FSErrorHandler) error {
var err error
w.watcher, err = fsnotify.NewWatcher()
if err != nil {
return err
}
w.eventHandler = eventHandler
w.errorHandler = errorHandler
return nil
}
func (w *fsnotifyWatcher) Run() {
go func() {
defer w.watcher.Close()
for {
select {
case event := <-w.watcher.Events:
if w.eventHandler != nil {
w.eventHandler(event)
}
case err := <-w.watcher.Errors:
if w.errorHandler != nil {
w.errorHandler(err)
}
}
}
}()
}

View File

@ -14,6 +14,7 @@ go_library(
"detacher.go",
"detacher-defaults.go",
"driver-call.go",
"fake_watcher.go",
"mounter.go",
"mounter-defaults.go",
"plugin.go",
@ -25,6 +26,7 @@ go_library(
"volume.go",
],
deps = [
"//pkg/util/filesystem:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
@ -48,13 +50,17 @@ go_test(
"flexvolume_test.go",
"mounter_test.go",
"plugin_test.go",
"probe_test.go",
"unmounter_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/util/filesystem:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//vendor/github.com/fsnotify/fsnotify:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -0,0 +1,53 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"github.com/fsnotify/fsnotify"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Mock filesystem watcher
type fakeWatcher struct {
watches []string // List of watches added by the prober, ordered from least recent to most recent.
eventHandler utilfs.FSEventHandler
}
var _ utilfs.FSWatcher = &fakeWatcher{}
func NewFakeWatcher() *fakeWatcher {
return &fakeWatcher{
watches: nil,
}
}
func (w *fakeWatcher) Init(eventHandler utilfs.FSEventHandler, _ utilfs.FSErrorHandler) error {
w.eventHandler = eventHandler
return nil
}
func (w *fakeWatcher) Run() { /* no-op */ }
func (w *fakeWatcher) AddWatch(path string) error {
w.watches = append(w.watches, path)
return nil
}
// Triggers a mock filesystem event.
func (w *fakeWatcher) TriggerEvent(op fsnotify.Op, filename string) {
w.eventHandler(fsnotify.Event{Op: op, Name: filename})
}

View File

@ -29,6 +29,6 @@ func logPrefix(plugin *flexVolumePlugin) string {
}
func (plugin *pluginDefaults) GetVolumeName(spec *volume.Spec) (string, error) {
glog.Warning(logPrefix((*flexVolumePlugin)(plugin)), "using default GetVolumeName for volume ", spec.Name)
glog.Warning(logPrefix((*flexVolumePlugin)(plugin)), "using default GetVolumeName for volume ", spec.Name())
return spec.Name(), nil
}

View File

@ -53,7 +53,13 @@ type flexVolumeAttachablePlugin struct {
var _ volume.AttachableVolumePlugin = &flexVolumeAttachablePlugin{}
var _ volume.PersistentVolumePlugin = &flexVolumePlugin{}
func NewFlexVolumePlugin(pluginDir, name string) (volume.VolumePlugin, error) {
type PluginFactory interface {
NewFlexVolumePlugin(pluginDir, driverName string) (volume.VolumePlugin, error)
}
type pluginFactory struct{}
func (pluginFactory) NewFlexVolumePlugin(pluginDir, name string) (volume.VolumePlugin, error) {
execPath := path.Join(pluginDir, name)
driverName := utilstrings.UnescapePluginName(name)

View File

@ -17,8 +17,6 @@ limitations under the License.
package flexvolume
import (
"io/ioutil"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume"
@ -31,15 +29,18 @@ import (
"github.com/fsnotify/fsnotify"
"k8s.io/apimachinery/pkg/util/errors"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
type flexVolumeProber struct {
mutex sync.Mutex
pluginDir string // Flexvolume driver directory
watcher *fsnotify.Watcher
watcher utilfs.FSWatcher
probeNeeded bool // Must only read and write this through testAndSetProbeNeeded.
lastUpdated time.Time // Last time probeNeeded was updated.
watchEventCount int
factory PluginFactory
fs utilfs.Filesystem
}
const (
@ -50,7 +51,12 @@ const (
)
func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
return &flexVolumeProber{pluginDir: pluginDir}
return &flexVolumeProber{
pluginDir: pluginDir,
watcher: utilfs.NewFsnotifyWatcher(),
factory: pluginFactory{},
fs: &utilfs.DefaultFs{},
}
}
func (prober *flexVolumeProber) Init() error {
@ -64,20 +70,6 @@ func (prober *flexVolumeProber) Init() error {
return err
}
go func() {
defer prober.watcher.Close()
for {
select {
case event := <-prober.watcher.Events:
if err := prober.handleWatchEvent(event); err != nil {
glog.Errorf("Flexvolume prober watch: %s", err)
}
case err := <-prober.watcher.Errors:
glog.Errorf("Received an error from watcher: %s", err)
}
}
}()
return nil
}
@ -94,7 +86,7 @@ func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePl
return false, nil, nil
}
files, err := ioutil.ReadDir(prober.pluginDir)
files, err := prober.fs.ReadDir(prober.pluginDir)
if err != nil {
return false, nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err)
}
@ -108,7 +100,7 @@ func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePl
// e.g. dirname = vendor~cifs
// then, executable will be pluginDir/dirname/cifs
if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
plugin, pluginErr := NewFlexVolumePlugin(prober.pluginDir, f.Name())
plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, f.Name())
if pluginErr != nil {
pluginErr = fmt.Errorf(
"Error creating Flexvolume plugin from directory %s, skipping. Error: %s",
@ -144,7 +136,6 @@ func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
// If the Flexvolume plugin directory is removed, need to recreate it
// in order to keep it under watch.
if eventOpIs(event, fsnotify.Remove) && eventPathAbs == pluginDirAbs {
glog.Warningf("Flexvolume plugin directory at %s is removed. Recreating.", pluginDirAbs)
if err := prober.createPluginDir(); err != nil {
return err
}
@ -186,35 +177,43 @@ func (prober *flexVolumeProber) updateProbeNeeded() {
func (prober *flexVolumeProber) addWatchRecursive(filename string) error {
addWatch := func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
if err := prober.watcher.Add(path); err != nil {
if err := prober.watcher.AddWatch(path); err != nil {
glog.Errorf("Error recursively adding watch: %v", err)
}
}
return nil
}
return filepath.Walk(filename, addWatch)
return prober.fs.Walk(filename, addWatch)
}
// Creates a new filesystem watcher and adds watches for the plugin directory
// and all of its subdirectories.
func (prober *flexVolumeProber) initWatcher() error {
var err error
if prober.watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("Error creating new watcher: %s", err)
err := prober.watcher.Init(func(event fsnotify.Event) {
if err := prober.handleWatchEvent(event); err != nil {
glog.Errorf("Flexvolume prober watch: %s", err)
}
}, func(err error) {
glog.Errorf("Received an error from watcher: %s", err)
})
if err != nil {
return fmt.Errorf("Error initializing watcher: %s", err)
}
if err = prober.addWatchRecursive(prober.pluginDir); err != nil {
if err := prober.addWatchRecursive(prober.pluginDir); err != nil {
return fmt.Errorf("Error adding watch on Flexvolume directory: %s", err)
}
prober.watcher.Run()
return nil
}
// Creates the plugin directory, if it doesn't already exist.
func (prober *flexVolumeProber) createPluginDir() error {
if _, err := os.Stat(prober.pluginDir); os.IsNotExist(err) {
err := os.MkdirAll(prober.pluginDir, 0755)
if _, err := prober.fs.Stat(prober.pluginDir); os.IsNotExist(err) {
glog.Warningf("Flexvolume plugin directory at %s does not exist. Recreating.", prober.pluginDir)
err := prober.fs.MkdirAll(prober.pluginDir, 0755)
if err != nil {
return fmt.Errorf("Error (re-)creating driver directory: %s", err)
}

View File

@ -0,0 +1,274 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"fmt"
"path"
"testing"
"github.com/fsnotify/fsnotify"
"github.com/stretchr/testify/assert"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/pkg/volume"
)
const (
pluginDir = "/flexvolume"
driverName = "fake-driver"
)
// Probes a driver installed before prober initialization.
func TestProberExistingDriverBeforeInit(t *testing.T) {
// Arrange
driverPath, _, watcher, prober := initTestEnvironment(t)
// Act
updated, plugins, err := prober.Probe()
// Assert
// Probe occurs, 1 plugin should be returned, and 2 watches (pluginDir and all its
// current subdirectories) registered.
assert.True(t, updated)
assert.Equal(t, 1, len(plugins))
assert.Equal(t, pluginDir, watcher.watches[0])
assert.Equal(t, driverPath, watcher.watches[1])
assert.NoError(t, err)
// Should no longer probe.
// Act
updated, plugins, err = prober.Probe()
// Assert
assert.False(t, updated)
assert.Equal(t, 0, len(plugins))
assert.NoError(t, err)
}
// Probes newly added drivers after prober is running.
func TestProberAddDriver(t *testing.T) {
// Arrange
_, fs, watcher, prober := initTestEnvironment(t)
prober.Probe()
updated, _, _ := prober.Probe()
assert.False(t, updated)
// Call probe after a file is added. Should return true.
// Arrange
const driverName2 = "fake-driver2"
driverPath := path.Join(pluginDir, driverName2)
installDriver(driverName2, fs)
watcher.TriggerEvent(fsnotify.Create, driverPath)
watcher.TriggerEvent(fsnotify.Create, path.Join(driverPath, driverName2))
// Act
updated, plugins, err := prober.Probe()
// Assert
assert.True(t, updated)
assert.Equal(t, 2, len(plugins)) // 1 existing, 1 newly added
assert.Equal(t, driverPath, watcher.watches[len(watcher.watches)-1]) // Checks most recent watch
assert.NoError(t, err)
// Call probe again, should return false.
// Act
updated, _, err = prober.Probe()
// Assert
assert.False(t, updated)
assert.NoError(t, err)
// Call probe after a non-driver file is added in a subdirectory. Should return true.
// Arrange
fp := path.Join(driverPath, "dummyfile")
fs.Create(fp)
watcher.TriggerEvent(fsnotify.Create, fp)
// Act
updated, plugins, err = prober.Probe()
// Assert
assert.True(t, updated)
assert.Equal(t, 2, len(plugins)) // Number of plugins should not change.
assert.NoError(t, err)
// Call probe again, should return false.
// Act
updated, _, err = prober.Probe()
// Assert
assert.False(t, updated)
assert.NoError(t, err)
}
// Tests the behavior when no drivers exist in the plugin directory.
func TestEmptyPluginDir(t *testing.T) {
// Arrange
fs := utilfs.NewFakeFs()
watcher := NewFakeWatcher()
prober := &flexVolumeProber{
pluginDir: pluginDir,
watcher: watcher,
fs: fs,
factory: fakePluginFactory{error: false},
}
prober.Init()
// Act
updated, plugins, err := prober.Probe()
// Assert
assert.True(t, updated)
assert.Equal(t, 0, len(plugins))
assert.NoError(t, err)
}
// Issue an event to remove plugindir. New directory should still be watched.
func TestRemovePluginDir(t *testing.T) {
// Arrange
driverPath, fs, watcher, _ := initTestEnvironment(t)
fs.RemoveAll(pluginDir)
watcher.TriggerEvent(fsnotify.Remove, path.Join(driverPath, driverName))
watcher.TriggerEvent(fsnotify.Remove, driverPath)
watcher.TriggerEvent(fsnotify.Remove, pluginDir)
// Act: The handler triggered by the above events should have already handled the event appropriately.
// Assert
assert.Equal(t, 3, len(watcher.watches)) // 2 from initial setup, 1 from new watch.
assert.Equal(t, pluginDir, watcher.watches[len(watcher.watches)-1])
}
// Issue multiple events and probe multiple times. Should give true, false, false...
func TestProberMultipleEvents(t *testing.T) {
const iterations = 5
// Arrange
_, fs, watcher, prober := initTestEnvironment(t)
for i := 0; i < iterations; i++ {
newDriver := fmt.Sprintf("multi-event-driver%d", 1)
installDriver(newDriver, fs)
driverPath := path.Join(pluginDir, newDriver)
watcher.TriggerEvent(fsnotify.Create, driverPath)
watcher.TriggerEvent(fsnotify.Create, path.Join(driverPath, newDriver))
}
// Act
updated, _, err := prober.Probe()
// Assert
assert.True(t, updated)
assert.NoError(t, err)
for i := 0; i < iterations-1; i++ {
updated, _, err = prober.Probe()
assert.False(t, updated)
assert.NoError(t, err)
}
}
// When many events are triggered quickly in succession, events should stop triggering a probe update
// after a certain limit.
func TestProberRateLimit(t *testing.T) {
// Arrange
driverPath, _, watcher, prober := initTestEnvironment(t)
for i := 0; i < watchEventLimit; i++ {
watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName))
}
// Act
updated, plugins, err := prober.Probe()
// Assert
// The probe results should not be different from what it would be if none of the events
// are triggered.
assert.True(t, updated)
assert.Equal(t, 1, len(plugins))
assert.NoError(t, err)
// Arrange
watcher.TriggerEvent(fsnotify.Write, path.Join(driverPath, driverName))
// Act
updated, _, err = prober.Probe()
// Assert
// The last event is outside the event limit. Should not trigger a probe.
assert.False(t, updated)
assert.NoError(t, err)
}
func TestProberError(t *testing.T) {
fs := utilfs.NewFakeFs()
watcher := NewFakeWatcher()
prober := &flexVolumeProber{
pluginDir: pluginDir,
watcher: watcher,
fs: fs,
factory: fakePluginFactory{error: true},
}
installDriver(driverName, fs)
prober.Init()
_, _, err := prober.Probe()
assert.Error(t, err)
}
// Installs a mock driver (an empty file) in the mock fs.
func installDriver(driverName string, fs utilfs.Filesystem) {
driverPath := path.Join(pluginDir, driverName)
fs.MkdirAll(driverPath, 0666)
fs.Create(path.Join(driverPath, driverName))
}
// Initializes mocks, installs a single driver in the mock fs, then initializes prober.
func initTestEnvironment(t *testing.T) (
driverPath string,
fs utilfs.Filesystem,
watcher *fakeWatcher,
prober volume.DynamicPluginProber) {
fs = utilfs.NewFakeFs()
watcher = NewFakeWatcher()
prober = &flexVolumeProber{
pluginDir: pluginDir,
watcher: watcher,
fs: fs,
factory: fakePluginFactory{error: false},
}
driverPath = path.Join(pluginDir, driverName)
installDriver(driverName, fs)
prober.Init()
assert.NotNilf(t, watcher.eventHandler,
"Expect watch event handler to be registered after prober init, but is not.")
return
}
// Fake Flexvolume plugin
type fakePluginFactory struct {
error bool // Indicates whether an error should be returned.
}
var _ PluginFactory = fakePluginFactory{}
func (m fakePluginFactory) NewFlexVolumePlugin(_, driverName string) (volume.VolumePlugin, error) {
if m.error {
return nil, fmt.Errorf("Flexvolume plugin error")
}
// Dummy Flexvolume plugin. Prober never interacts with the plugin.
return &flexVolumePlugin{driverName: driverName}, nil
}

View File

@ -63,10 +63,10 @@ func testFlexVolume(driver string, cs clientset.Interface, config framework.Volu
}
}
// installFlex installs the driver found at filePath on the node and restarts
// kubelet. If node is nil, installs on the master and restarts
// controller-manager.
func installFlex(node *v1.Node, vendor, driver, filePath string) {
// installFlex installs the driver found at filePath on the node, and restarts
// kubelet if 'restart' is true. If node is nil, installs on the master, and restarts
// controller-manager if 'restart' is true.
func installFlex(node *v1.Node, vendor, driver, filePath string, restart bool) {
flexDir := getFlexDir(node == nil, vendor, driver)
flexFile := path.Join(flexDir, driver)
@ -87,6 +87,10 @@ func installFlex(node *v1.Node, vendor, driver, filePath string) {
cmd = fmt.Sprintf("sudo chmod +x %s", flexFile)
sshAndLog(cmd, host)
if !restart {
return
}
if node != nil {
err := framework.RestartKubelet(host)
framework.ExpectNoError(err)
@ -170,7 +174,7 @@ var _ = SIGDescribe("Flexvolumes [Disruptive] [Feature:FlexVolume]", func() {
driverInstallAs := driver + "-" + suffix
By(fmt.Sprintf("installing flexvolume %s on node %s as %s", path.Join(driverDir, driver), node.Name, driverInstallAs))
installFlex(&node, "k8s", driverInstallAs, path.Join(driverDir, driver))
installFlex(&node, "k8s", driverInstallAs, path.Join(driverDir, driver), true /* restart */)
testFlexVolume(driverInstallAs, cs, config, f, clean)
@ -188,9 +192,9 @@ var _ = SIGDescribe("Flexvolumes [Disruptive] [Feature:FlexVolume]", func() {
driverInstallAs := driver + "-" + suffix
By(fmt.Sprintf("installing flexvolume %s on node %s as %s", path.Join(driverDir, driver), node.Name, driverInstallAs))
installFlex(&node, "k8s", driverInstallAs, path.Join(driverDir, driver))
installFlex(&node, "k8s", driverInstallAs, path.Join(driverDir, driver), true /* restart */)
By(fmt.Sprintf("installing flexvolume %s on master as %s", path.Join(driverDir, driver), driverInstallAs))
installFlex(nil, "k8s", driverInstallAs, path.Join(driverDir, driver))
installFlex(nil, "k8s", driverInstallAs, path.Join(driverDir, driver), true /* restart */)
testFlexVolume(driverInstallAs, cs, config, f, clean)
@ -204,4 +208,22 @@ var _ = SIGDescribe("Flexvolumes [Disruptive] [Feature:FlexVolume]", func() {
By(fmt.Sprintf("uninstalling flexvolume %s from master", driverInstallAs))
uninstallFlex(nil, "k8s", driverInstallAs)
})
It("should install plugin without kubelet restart", func() {
driver := "dummy"
driverInstallAs := driver + "-" + suffix
By(fmt.Sprintf("installing flexvolume %s on node %s as %s", path.Join(driverDir, driver), node.Name, driverInstallAs))
installFlex(&node, "k8s", driverInstallAs, path.Join(driverDir, driver), false /* restart */)
testFlexVolume(driverInstallAs, cs, config, f, clean)
By("waiting for flex client pod to terminate")
if err := f.WaitForPodTerminated(config.Prefix+"-client", ""); !apierrs.IsNotFound(err) {
framework.ExpectNoError(err, "Failed to wait client pod terminated: %v", err)
}
By(fmt.Sprintf("uninstalling flexvolume %s from node %s", driverInstallAs, node.Name))
uninstallFlex(&node, "k8s", driverInstallAs)
})
})