mirror of https://github.com/k3s-io/k3s
Update pluginwatcher tests
@ -1,10 +1,4 @@
package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
name = "go_default_library",
@ -12,8 +6,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher",
visibility = ["//visibility:public"],
deps = [
@ -27,6 +23,16 @@ go_library(
name = "go_default_test",
srcs = ["plugin_watcher_test.go"],
embed = [":go_default_library"],
deps = [
name = "package-srcs",
srcs = glob(["**"]),
@ -44,14 +50,3 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
name = "go_default_test",
srcs = ["plugin_watcher_test.go"],
embed = [":go_default_library"],
deps = [
@ -23,6 +23,7 @@ import (
v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1"
@ -30,41 +31,61 @@ import (
type exampleHandler struct {
registeredPlugins map[string]struct{}
mutex sync.Mutex
chanForHandlerAckErrors chan error // for testing
SupportedVersions []string
ExpectedNames map[string]int
eventChans map[string]chan examplePluginEvent // map[pluginName]eventChan
m sync.Mutex
count int
type examplePluginEvent int
const (
exampleEventValidate examplePluginEvent = 0
exampleEventRegister examplePluginEvent = 1
exampleEventDeRegister examplePluginEvent = 2
exampleEventError examplePluginEvent = 3
// NewExampleHandler provide a example handler
func NewExampleHandler() *exampleHandler {
func NewExampleHandler(supportedVersions []string) *exampleHandler {
return &exampleHandler{
chanForHandlerAckErrors: make(chan error),
registeredPlugins: make(map[string]struct{}),
SupportedVersions: supportedVersions,
ExpectedNames: make(map[string]int),
eventChans: make(map[string]chan examplePluginEvent),
func (h *exampleHandler) Cleanup() error {
defer h.mutex.Unlock()
h.registeredPlugins = make(map[string]struct{})
return nil
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventValidate)
func (h *exampleHandler) Handler(pluginName string, endpoint string, versions []string, sockPath string) (chan bool, error) {
n, ok := p.DecreasePluginCount(pluginName)
if !ok && n > 0 {
return fmt.Errorf("pluginName('%s') wasn't expected (count is %d)", pluginName, n)
// check for supported versions
if !reflect.DeepEqual([]string{"v1beta1", "v1beta2"}, versions) {
return nil, fmt.Errorf("not the supported versions: %s", versions)
if !reflect.DeepEqual(versions, p.SupportedVersions) {
return fmt.Errorf("versions('%v') != supported versions('%v')", versions, p.SupportedVersions)
// this handler expects non-empty endpoint as an example
if len(endpoint) == 0 {
return nil, errors.New("expecting non empty endpoint")
return errors.New("expecting non empty endpoint")
_, conn, err := dial(sockPath)
return nil
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string) error {
p.SendEvent(pluginName, exampleEventRegister)
// Verifies the grpcServer is ready to serve services.
_, conn, err := dial(endpoint, time.Second)
if err != nil {
return nil, err
return fmt.Errorf("Failed dialing endpoint (%s): %v", endpoint, err)
defer conn.Close()
@ -73,33 +94,54 @@ func (h *exampleHandler) Handler(pluginName string, endpoint string, versions []
v1beta2Client := v1beta2.NewExampleClient(conn)
// Tests v1beta1 GetExampleInfo
if _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{}); err != nil {
return nil, err
_, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{})
if err != nil {
return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
// Tests v1beta2 GetExampleInfo
if _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{}); err != nil {
return nil, err
// Tests v1beta1 GetExampleInfo
_, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{})
if err != nil {
return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
// handle registered plugin
if _, exist := h.registeredPlugins[pluginName]; exist {
return nil, fmt.Errorf("plugin %s already registered", pluginName)
h.registeredPlugins[pluginName] = struct{}{}
chanForAckOfNotification := make(chan bool)
go func() {
select {
case <-chanForAckOfNotification:
// TODO: handle the negative scenario
case <-time.After(time.Second):
h.chanForHandlerAckErrors <- errors.New("Timed out while waiting for notification ack")
return chanForAckOfNotification, nil
return nil
func (p *exampleHandler) DeRegisterPlugin(pluginName string) {
p.SendEvent(pluginName, exampleEventDeRegister)
func (p *exampleHandler) EventChan(pluginName string) chan examplePluginEvent {
return p.eventChans[pluginName]
func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) {
glog.V(2).Infof("Sending %v for plugin %s over chan %v", event, pluginName, p.eventChans[pluginName])
p.eventChans[pluginName] <- event
func (p *exampleHandler) AddPluginName(pluginName string) {
defer p.m.Unlock()
v, ok := p.ExpectedNames[pluginName]
if !ok {
p.eventChans[pluginName] = make(chan examplePluginEvent)
v = 1
p.ExpectedNames[pluginName] = v
func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok bool) {
defer p.m.Unlock()
v, ok := p.ExpectedNames[pluginName]
if !ok {
v = -1
return v, ok
@ -18,7 +18,9 @@ package pluginwatcher
import (
@ -39,6 +41,7 @@ type examplePlugin struct {
endpoint string // for testing
pluginName string
pluginType string
versions []string
type pluginServiceV1Beta1 struct {
@ -73,12 +76,13 @@ func NewExamplePlugin() *examplePlugin {
// NewTestExamplePlugin returns an initialized examplePlugin instance for testing
func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string) *examplePlugin {
func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
return &examplePlugin{
pluginName: pluginName,
pluginType: pluginType,
registrationStatus: make(chan registerapi.RegistrationStatus),
endpoint: endpoint,
versions: advertisedVersions,
registrationStatus: make(chan registerapi.RegistrationStatus),
@ -88,36 +92,48 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques
Type: e.pluginType,
Name: e.pluginName,
Endpoint: e.endpoint,
SupportedVersions: []string{"v1beta1", "v1beta2"},
SupportedVersions: e.versions,
}, nil
func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
glog.Errorf("Registration is: %v\n", status)
if e.registrationStatus != nil {
e.registrationStatus <- *status
if !status.PluginRegistered {
glog.Errorf("Registration failed: %s\n", status.Error)
return ®isterapi.RegistrationStatusResponse{}, nil
// Serve starts example plugin grpc server
func (e *examplePlugin) Serve(socketPath string) error {
glog.Infof("starting example server at: %s\n", socketPath)
lis, err := net.Listen("unix", socketPath)
// Serve starts a pluginwatcher server and one or more of the plugin services
func (e *examplePlugin) Serve(services ...string) error {
glog.Infof("starting example server at: %s\n", e.endpoint)
lis, err := net.Listen("unix", e.endpoint)
if err != nil {
return err
glog.Infof("example server started at: %s\n", socketPath)
glog.Infof("example server started at: %s\n", e.endpoint)
e.grpcServer = grpc.NewServer()
// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(e.grpcServer, e)
// Registers services for both v1beta1 and v1beta2 versions.
v1beta1 := &pluginServiceV1Beta1{server: e}
v1beta2 := &pluginServiceV1Beta2{server: e}
for _, service := range services {
switch service {
case "v1beta1":
v1beta1 := &pluginServiceV1Beta1{server: e}
case "v1beta2":
v1beta2 := &pluginServiceV1Beta2{server: e}
return fmt.Errorf("Unsupported service: '%s'", service)
// Starts service
@ -128,22 +144,30 @@ func (e *examplePlugin) Serve(socketPath string) error {
glog.Errorf("example server stopped serving: %v", err)
return nil
func (e *examplePlugin) Stop() error {
glog.Infof("Stopping example server\n")
glog.Infof("Stopping example server at: %s\n", e.endpoint)
c := make(chan struct{})
go func() {
defer close(c)
select {
case <-c:
return nil
case <-time.After(time.Second):
glog.Errorf("Timed out on waiting for stop completion")
return errors.New("Timed out on waiting for stop completion")
if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
return err
return nil
@ -17,192 +17,222 @@ limitations under the License.
package pluginwatcher
import (
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
// helper function
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
var (
socketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
func init() {
var logLevel string
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
flag.StringVar(&logLevel, "logLevel", "6", "test")
d, err := ioutil.TempDir("", "plugin_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
socketDir = d
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
os.MkdirAll(socketDir, 0755)
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions)
w := newWatcherWithHandler(t, hdlr)
defer func() { require.NoError(t, w.Stop()) }()
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
func TestPluginReRegistration(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
hdlr := NewExampleHandler(supportedVersions)
w := newWatcherWithHandler(t, hdlr)
defer func() { require.NoError(t, w.Stop()) }()
plugins := make([]*examplePlugin, 10)
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
plugins[i] = p
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(pluginName)))
for i := 0; i < len(plugins)-1; i++ {
func TestPluginRegistrationAtKubeletStart(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions)
plugins := make([]*examplePlugin, 10)
for i := 0; i < len(plugins); i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
defer func(p *examplePlugin) { require.NoError(t, p.Stop()) }(p)
plugins[i] = p
w := newWatcherWithHandler(t, hdlr)
defer func() { require.NoError(t, w.Stop()) }()
var wg sync.WaitGroup
for i := 0; i < len(plugins); i++ {
go func(p *examplePlugin) {
defer wg.Done()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
c := make(chan struct{})
go func() {
defer close(c)
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
func TestExamplePlugin(t *testing.T) {
rootDir, err := ioutil.TempDir("", "plugin_test")
require.NoError(t, err)
w := NewWatcher(rootDir)
h := NewExampleHandler()
w.AddHandler(registerapi.DevicePlugin, h.Handler)
require.NoError(t, w.Start())
socketPath := filepath.Join(rootDir, "plugin.sock")
PluginName := "example-plugin"
// handler expecting plugin has a non-empty endpoint
p := NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "")
require.NoError(t, p.Serve(socketPath))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
require.NoError(t, p.Stop())
p = NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "dummyEndpoint")
require.NoError(t, p.Serve(socketPath))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
// Trying to start a plugin service at the same socket path should fail
// with "bind: address already in use"
require.NotNil(t, p.Serve(socketPath))
// grpcServer.Stop() will remove the socket and starting plugin service
// at the same path again should succeeds and trigger another callback.
require.NoError(t, p.Stop())
require.Nil(t, p.Serve(socketPath))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
// Starting another plugin with the same name got verification error.
p2 := NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "dummyEndpoint")
socketPath2 := filepath.Join(rootDir, "plugin2.sock")
require.NoError(t, p2.Serve(socketPath2))
require.False(t, waitForPluginRegistrationStatus(t, p2.registrationStatus))
// Restarts plugin watcher should traverse the socket directory and issues a
// callback for every existing socket.
require.NoError(t, w.Stop())
require.NoError(t, h.Cleanup())
require.NoError(t, w.Start())
var wg sync.WaitGroup
var pStatus string
var p2Status string
go func() {
pStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, p.registrationStatus))
go func() {
p2Status = strconv.FormatBool(waitForPluginRegistrationStatus(t, p2.registrationStatus))
if waitTimeout(&wg, 2*time.Second) {
t.Fatalf("Timed out waiting for wait group")
expectedSet := sets.NewString()
expectedSet.Insert("true", "false")
actualSet := sets.NewString()
actualSet.Insert(pStatus, p2Status)
require.Equal(t, expectedSet, actualSet)
select {
case err := <-h.chanForHandlerAckErrors:
t.Fatalf("%v", err)
case <-time.After(2 * time.Second):
t.Fatalf("Timeout while waiting for the plugin registration status")
require.NoError(t, w.Stop())
require.NoError(t, w.Cleanup())
func TestPluginWithSubDir(t *testing.T) {
rootDir, err := ioutil.TempDir("", "plugin_test")
require.NoError(t, err)
func TestPluginRegistrationFailureWithUnsupportedVersion(t *testing.T) {
defer cleanup(t)
w := NewWatcher(rootDir)
hcsi := NewExampleHandler()
hdp := NewExampleHandler()
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
w.AddHandler(registerapi.CSIPlugin, hcsi.Handler)
w.AddHandler(registerapi.DevicePlugin, hdp.Handler)
hdlr := NewExampleHandler(supportedVersions)
err = w.fs.MkdirAll(filepath.Join(rootDir, registerapi.DevicePlugin), 0755)
require.NoError(t, err)
err = w.fs.MkdirAll(filepath.Join(rootDir, registerapi.CSIPlugin), 0755)
require.NoError(t, err)
w := newWatcherWithHandler(t, hdlr)
defer func() { require.NoError(t, w.Stop()) }()
dpSocketPath := filepath.Join(rootDir, registerapi.DevicePlugin, "plugin.sock")
csiSocketPath := filepath.Join(rootDir, registerapi.CSIPlugin, "plugin.sock")
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
require.NoError(t, w.Start())
// two plugins using the same name but with different type
dp := NewTestExamplePlugin("exampleplugin", registerapi.DevicePlugin, "example-endpoint")
require.NoError(t, dp.Serve(dpSocketPath))
require.True(t, waitForPluginRegistrationStatus(t, dp.registrationStatus))
csi := NewTestExamplePlugin("exampleplugin", registerapi.CSIPlugin, "example-endpoint")
require.NoError(t, csi.Serve(csiSocketPath))
require.True(t, waitForPluginRegistrationStatus(t, csi.registrationStatus))
// Restarts plugin watcher should traverse the socket directory and issues a
// callback for every existing socket.
require.NoError(t, w.Stop())
require.NoError(t, hcsi.Cleanup())
require.NoError(t, hdp.Cleanup())
require.NoError(t, w.Start())
var wg sync.WaitGroup
var dpStatus string
var csiStatus string
go func() {
dpStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, dp.registrationStatus))
go func() {
csiStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, csi.registrationStatus))
if waitTimeout(&wg, 4*time.Second) {
require.NoError(t, errors.New("Timed out waiting for wait group"))
expectedSet := sets.NewString()
expectedSet.Insert("true", "true")
actualSet := sets.NewString()
actualSet.Insert(dpStatus, csiStatus)
require.Equal(t, expectedSet, actualSet)
select {
case err := <-hcsi.chanForHandlerAckErrors:
t.Fatalf("%v", err)
case err := <-hdp.chanForHandlerAckErrors:
t.Fatalf("%v", err)
case <-time.After(4 * time.Second):
require.NoError(t, w.Stop())
require.NoError(t, w.Cleanup())
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
func waitForPluginRegistrationStatus(t *testing.T, statusCh chan registerapi.RegistrationStatus) bool {
func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
hdlr := NewExampleHandler(supportedVersions)
w := newWatcherWithHandler(t, hdlr)
defer func() { require.NoError(t, w.Stop()) }()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
select {
case status := <-statusCh:
case status := <-statusChan:
return status.PluginRegistered
case <-time.After(10 * time.Second):
t.Fatalf("Timed out while waiting for registration status")
return false
func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan examplePluginEvent) bool {
select {
case event := <-eventChan:
return event == expected
case <-time.After(2 * time.Second):
t.Fatalf("Timed out while waiting for registration status %v", expected)
return false
func newWatcherWithHandler(t *testing.T, hdlr PluginHandler) *Watcher {
w := NewWatcher(socketDir)
w.AddHandler(registerapi.DevicePlugin, hdlr)
require.NoError(t, w.Start())
return w
Reference in New Issue