k3s/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go

481 lines
15 KiB
Go

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginwatcher
import (
"flag"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/klog"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
)
var (
socketDir string
deprecatedSocketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
func init() {
var logLevel string
klog.InitFlags(flag.CommandLine)
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
flag.StringVar(&logLevel, "logLevel", "6", "test")
flag.Lookup("v").Value.Set(logLevel)
d, err := ioutil.TempDir("", "plugin_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
d2, err := ioutil.TempDir("", "deprecated_plugin_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
deprecatedSocketDir = d2
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
require.NoError(t, os.RemoveAll(deprecatedSocketDir))
os.MkdirAll(socketDir, 0755)
os.MkdirAll(deprecatedSocketDir, 0755)
}
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
}
}
func TestPluginRegistrationDeprecated(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, true /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, true /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
// Test plugins in deprecated dir
for i := 0; i < 10; i++ {
endpoint := fmt.Sprintf("%s/dep-plugin-%d.sock", deprecatedSocketDir, i)
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, endpoint, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
}
}
func TestPluginReRegistration(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
plugins := make([]*examplePlugin, 10)
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
plugins[i] = p
}
plugins[len(plugins)-1].Stop()
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(pluginName)))
close(hdlr.EventChan(pluginName))
for i := 0; i < len(plugins)-1; i++ {
plugins[i].Stop()
}
}
func TestPluginRegistrationAtKubeletStart(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
plugins := make([]*examplePlugin, 10)
for i := 0; i < len(plugins); i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
defer func(p *examplePlugin) { require.NoError(t, p.Stop()) }(p)
plugins[i] = p
}
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
var wg sync.WaitGroup
for i := 0; i < len(plugins); i++ {
wg.Add(1)
go func(p *examplePlugin) {
defer wg.Done()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}(plugins[i])
}
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(2 * time.Second):
t.Fatalf("Timeout while waiting for the plugin registration status")
}
}
func TestPluginRegistrationFailureWithUnsupportedVersion(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}
func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
select {
case status := <-statusChan:
return status.PluginRegistered
case <-time.After(10 * time.Second):
t.Fatalf("Timed out while waiting for registration status")
}
return false
}
func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan examplePluginEvent) bool {
select {
case event := <-eventChan:
return event == expected
case <-time.After(2 * time.Second):
t.Fatalf("Timed out while waiting for registration status %v", expected)
}
return false
}
func newWatcherWithHandler(t *testing.T, hdlr PluginHandler, testDeprecatedDir bool) *Watcher {
depSocketDir := ""
if testDeprecatedDir {
depSocketDir = deprecatedSocketDir
}
w := NewWatcher(socketDir, depSocketDir)
w.AddHandler(registerapi.DevicePlugin, hdlr)
require.NoError(t, w.Start())
return w
}
func TestFoundInDeprecatedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
socketPath string
expectFoundInDeprecatedDir bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/kubernetes.io",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/my.driver.com",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/my.driver.com",
expectFoundInDeprecatedDir: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
actualFoundInDeprecatedDir := watcher.foundInDeprecatedDir(tc.socketPath)
// Assert
if tc.expectFoundInDeprecatedDir != actualFoundInDeprecatedDir {
t.Fatalf("expecting actualFoundInDeprecatedDir=%v, but got %v for testcase: %#v", tc.expectFoundInDeprecatedDir, actualFoundInDeprecatedDir, tc)
}
}
}
func TestContainsBlacklistedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
path string
expected bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/my.plugin/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expected: false, // New (non-deprecated dir) has no blacklist
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/",
expected: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
actual := watcher.containsBlacklistedDir(tc.path)
// Assert
if tc.expected != actual {
t.Fatalf("expecting %v but got %v for testcase: %#v", tc.expected, actual, tc)
}
}
}