diff --git a/pkg/kubelet/util/pluginwatcher/BUILD b/pkg/kubelet/util/pluginwatcher/BUILD index 7b887b444f..0301c6b95b 100644 --- a/pkg/kubelet/util/pluginwatcher/BUILD +++ b/pkg/kubelet/util/pluginwatcher/BUILD @@ -1,10 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -12,8 +6,10 @@ go_library( "example_handler.go", "example_plugin.go", "plugin_watcher.go", + "types.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher", + visibility = ["//visibility:public"], deps = [ "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:go_default_library", @@ -27,6 +23,16 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = ["plugin_watcher_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -44,14 +50,3 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) - -go_test( - name = "go_default_test", - srcs = ["plugin_watcher_test.go"], - embed = [":go_default_library"], - deps = [ - "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", - ], -) diff --git a/pkg/kubelet/util/pluginwatcher/example_handler.go b/pkg/kubelet/util/pluginwatcher/example_handler.go index 4eae4188d6..8f9cac5d9b 100644 --- a/pkg/kubelet/util/pluginwatcher/example_handler.go +++ b/pkg/kubelet/util/pluginwatcher/example_handler.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/golang/glog" "golang.org/x/net/context" v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1" @@ -30,41 +31,61 @@ import ( ) type exampleHandler struct { - registeredPlugins map[string]struct{} - mutex sync.Mutex - chanForHandlerAckErrors chan error // for testing + SupportedVersions []string + ExpectedNames map[string]int + + eventChans map[string]chan examplePluginEvent // map[pluginName]eventChan + + m sync.Mutex + count int } +type examplePluginEvent int + +const ( + exampleEventValidate examplePluginEvent = 0 + exampleEventRegister examplePluginEvent = 1 + exampleEventDeRegister examplePluginEvent = 2 + exampleEventError examplePluginEvent = 3 +) + // NewExampleHandler provide a example handler -func NewExampleHandler() *exampleHandler { +func NewExampleHandler(supportedVersions []string) *exampleHandler { return &exampleHandler{ - chanForHandlerAckErrors: make(chan error), - registeredPlugins: make(map[string]struct{}), + SupportedVersions: supportedVersions, + ExpectedNames: make(map[string]int), + + eventChans: make(map[string]chan examplePluginEvent), } } -func (h *exampleHandler) Cleanup() error { - h.mutex.Lock() - defer h.mutex.Unlock() - h.registeredPlugins = make(map[string]struct{}) - return nil -} +func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + p.SendEvent(pluginName, exampleEventValidate) -func (h *exampleHandler) Handler(pluginName string, endpoint string, versions []string, sockPath string) (chan bool, error) { + n, ok := p.DecreasePluginCount(pluginName) + if !ok && n > 0 { + return fmt.Errorf("pluginName('%s') wasn't expected (count is %d)", pluginName, n) + } - // check for supported versions - if !reflect.DeepEqual([]string{"v1beta1", "v1beta2"}, versions) { - return nil, fmt.Errorf("not the supported versions: %s", versions) + if !reflect.DeepEqual(versions, p.SupportedVersions) { + return fmt.Errorf("versions('%v') != supported versions('%v')", versions, p.SupportedVersions) } // this handler expects non-empty endpoint as an example if len(endpoint) == 0 { - return nil, errors.New("expecting non empty endpoint") + return errors.New("expecting non empty endpoint") } - _, conn, err := dial(sockPath) + return nil +} + +func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string) error { + p.SendEvent(pluginName, exampleEventRegister) + + // Verifies the grpcServer is ready to serve services. + _, conn, err := dial(endpoint, time.Second) if err != nil { - return nil, err + return fmt.Errorf("Failed dialing endpoint (%s): %v", endpoint, err) } defer conn.Close() @@ -73,33 +94,54 @@ func (h *exampleHandler) Handler(pluginName string, endpoint string, versions [] v1beta2Client := v1beta2.NewExampleClient(conn) // Tests v1beta1 GetExampleInfo - if _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{}); err != nil { - return nil, err + _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{}) + if err != nil { + return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err) } - // Tests v1beta2 GetExampleInfo - if _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{}); err != nil { - return nil, err + // Tests v1beta1 GetExampleInfo + _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{}) + if err != nil { + return fmt.Errorf("Failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err) } - // handle registered plugin - h.mutex.Lock() - if _, exist := h.registeredPlugins[pluginName]; exist { - h.mutex.Unlock() - return nil, fmt.Errorf("plugin %s already registered", pluginName) - } - h.registeredPlugins[pluginName] = struct{}{} - h.mutex.Unlock() - - chanForAckOfNotification := make(chan bool) - go func() { - select { - case <-chanForAckOfNotification: - // TODO: handle the negative scenario - close(chanForAckOfNotification) - case <-time.After(time.Second): - h.chanForHandlerAckErrors <- errors.New("Timed out while waiting for notification ack") - } - }() - return chanForAckOfNotification, nil + return nil +} + +func (p *exampleHandler) DeRegisterPlugin(pluginName string) { + p.SendEvent(pluginName, exampleEventDeRegister) +} + +func (p *exampleHandler) EventChan(pluginName string) chan examplePluginEvent { + return p.eventChans[pluginName] +} + +func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) { + glog.V(2).Infof("Sending %v for plugin %s over chan %v", event, pluginName, p.eventChans[pluginName]) + p.eventChans[pluginName] <- event +} + +func (p *exampleHandler) AddPluginName(pluginName string) { + p.m.Lock() + defer p.m.Unlock() + + v, ok := p.ExpectedNames[pluginName] + if !ok { + p.eventChans[pluginName] = make(chan examplePluginEvent) + v = 1 + } + + p.ExpectedNames[pluginName] = v +} + +func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok bool) { + p.m.Lock() + defer p.m.Unlock() + + v, ok := p.ExpectedNames[pluginName] + if !ok { + v = -1 + } + + return v, ok } diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin.go b/pkg/kubelet/util/pluginwatcher/example_plugin.go index 5c2dd966ba..694b366120 100644 --- a/pkg/kubelet/util/pluginwatcher/example_plugin.go +++ b/pkg/kubelet/util/pluginwatcher/example_plugin.go @@ -18,7 +18,9 @@ package pluginwatcher import ( "errors" + "fmt" "net" + "os" "sync" "time" @@ -39,6 +41,7 @@ type examplePlugin struct { endpoint string // for testing pluginName string pluginType string + versions []string } type pluginServiceV1Beta1 struct { @@ -73,12 +76,13 @@ func NewExamplePlugin() *examplePlugin { } // NewTestExamplePlugin returns an initialized examplePlugin instance for testing -func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string) *examplePlugin { +func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin { return &examplePlugin{ pluginName: pluginName, pluginType: pluginType, - registrationStatus: make(chan registerapi.RegistrationStatus), endpoint: endpoint, + versions: advertisedVersions, + registrationStatus: make(chan registerapi.RegistrationStatus), } } @@ -88,36 +92,48 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques Type: e.pluginType, Name: e.pluginName, Endpoint: e.endpoint, - SupportedVersions: []string{"v1beta1", "v1beta2"}, + SupportedVersions: e.versions, }, nil } func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { + glog.Errorf("Registration is: %v\n", status) + if e.registrationStatus != nil { e.registrationStatus <- *status } - if !status.PluginRegistered { - glog.Errorf("Registration failed: %s\n", status.Error) - } + return ®isterapi.RegistrationStatusResponse{}, nil } -// Serve starts example plugin grpc server -func (e *examplePlugin) Serve(socketPath string) error { - glog.Infof("starting example server at: %s\n", socketPath) - lis, err := net.Listen("unix", socketPath) +// Serve starts a pluginwatcher server and one or more of the plugin services +func (e *examplePlugin) Serve(services ...string) error { + glog.Infof("starting example server at: %s\n", e.endpoint) + lis, err := net.Listen("unix", e.endpoint) if err != nil { return err } - glog.Infof("example server started at: %s\n", socketPath) + + glog.Infof("example server started at: %s\n", e.endpoint) e.grpcServer = grpc.NewServer() + // Registers kubelet plugin watcher api. registerapi.RegisterRegistrationServer(e.grpcServer, e) - // Registers services for both v1beta1 and v1beta2 versions. - v1beta1 := &pluginServiceV1Beta1{server: e} - v1beta1.RegisterService() - v1beta2 := &pluginServiceV1Beta2{server: e} - v1beta2.RegisterService() + + for _, service := range services { + switch service { + case "v1beta1": + v1beta1 := &pluginServiceV1Beta1{server: e} + v1beta1.RegisterService() + break + case "v1beta2": + v1beta2 := &pluginServiceV1Beta2{server: e} + v1beta2.RegisterService() + break + default: + return fmt.Errorf("Unsupported service: '%s'", service) + } + } // Starts service e.wg.Add(1) @@ -128,22 +144,30 @@ func (e *examplePlugin) Serve(socketPath string) error { glog.Errorf("example server stopped serving: %v", err) } }() + return nil } func (e *examplePlugin) Stop() error { - glog.Infof("Stopping example server\n") + glog.Infof("Stopping example server at: %s\n", e.endpoint) + e.grpcServer.Stop() c := make(chan struct{}) go func() { defer close(c) e.wg.Wait() }() + select { case <-c: - return nil + break case <-time.After(time.Second): - glog.Errorf("Timed out on waiting for stop completion") return errors.New("Timed out on waiting for stop completion") } + + if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) { + return err + } + + return nil } diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go index 5bfb49568e..fdcb8b705b 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go @@ -17,192 +17,222 @@ limitations under the License. package pluginwatcher import ( - "errors" + "flag" + "fmt" "io/ioutil" - "path/filepath" - "strconv" + "os" "sync" "testing" "time" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/util/sets" registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" ) -// helper function -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { +var ( + socketDir string + + supportedVersions = []string{"v1beta1", "v1beta2"} +) + +func init() { + var logLevel string + + flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) + flag.StringVar(&logLevel, "logLevel", "6", "test") + flag.Lookup("v").Value.Set(logLevel) + + d, err := ioutil.TempDir("", "plugin_test") + if err != nil { + panic(fmt.Sprintf("Could not create a temp directory: %s", d)) + } + + socketDir = d +} + +func cleanup(t *testing.T) { + require.NoError(t, os.RemoveAll(socketDir)) + os.MkdirAll(socketDir, 0755) +} + +func TestPluginRegistration(t *testing.T) { + defer cleanup(t) + + hdlr := NewExampleHandler(supportedVersions) + w := newWatcherWithHandler(t, hdlr) + defer func() { require.NoError(t, w.Stop()) }() + + for i := 0; i < 10; i++ { + socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) + pluginName := fmt.Sprintf("example-plugin-%d", i) + + hdlr.AddPluginName(pluginName) + + p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) + require.NoError(t, p.Serve("v1beta1", "v1beta2")) + + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName))) + + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + require.NoError(t, p.Stop()) + require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName))) + } +} + +func TestPluginReRegistration(t *testing.T) { + defer cleanup(t) + + pluginName := fmt.Sprintf("example-plugin") + hdlr := NewExampleHandler(supportedVersions) + + w := newWatcherWithHandler(t, hdlr) + defer func() { require.NoError(t, w.Stop()) }() + + plugins := make([]*examplePlugin, 10) + + for i := 0; i < 10; i++ { + socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) + hdlr.AddPluginName(pluginName) + + p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) + require.NoError(t, p.Serve("v1beta1", "v1beta2")) + + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName))) + + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + plugins[i] = p + } + + plugins[len(plugins)-1].Stop() + require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(pluginName))) + + close(hdlr.EventChan(pluginName)) + for i := 0; i < len(plugins)-1; i++ { + plugins[i].Stop() + } +} + +func TestPluginRegistrationAtKubeletStart(t *testing.T) { + defer cleanup(t) + + hdlr := NewExampleHandler(supportedVersions) + plugins := make([]*examplePlugin, 10) + + for i := 0; i < len(plugins); i++ { + socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) + pluginName := fmt.Sprintf("example-plugin-%d", i) + hdlr.AddPluginName(pluginName) + + p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) + require.NoError(t, p.Serve("v1beta1", "v1beta2")) + defer func(p *examplePlugin) { require.NoError(t, p.Stop()) }(p) + + plugins[i] = p + } + + w := newWatcherWithHandler(t, hdlr) + defer func() { require.NoError(t, w.Stop()) }() + + var wg sync.WaitGroup + for i := 0; i < len(plugins); i++ { + wg.Add(1) + go func(p *examplePlugin) { + defer wg.Done() + + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName))) + + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + }(plugins[i]) + } + c := make(chan struct{}) go func() { defer close(c) wg.Wait() }() + select { case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} - -func TestExamplePlugin(t *testing.T) { - rootDir, err := ioutil.TempDir("", "plugin_test") - require.NoError(t, err) - w := NewWatcher(rootDir) - h := NewExampleHandler() - w.AddHandler(registerapi.DevicePlugin, h.Handler) - - require.NoError(t, w.Start()) - - socketPath := filepath.Join(rootDir, "plugin.sock") - PluginName := "example-plugin" - - // handler expecting plugin has a non-empty endpoint - p := NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "") - require.NoError(t, p.Serve(socketPath)) - require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) - require.NoError(t, p.Stop()) - - p = NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "dummyEndpoint") - require.NoError(t, p.Serve(socketPath)) - require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) - - // Trying to start a plugin service at the same socket path should fail - // with "bind: address already in use" - require.NotNil(t, p.Serve(socketPath)) - - // grpcServer.Stop() will remove the socket and starting plugin service - // at the same path again should succeeds and trigger another callback. - require.NoError(t, p.Stop()) - require.Nil(t, p.Serve(socketPath)) - require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) - - // Starting another plugin with the same name got verification error. - p2 := NewTestExamplePlugin(PluginName, registerapi.DevicePlugin, "dummyEndpoint") - socketPath2 := filepath.Join(rootDir, "plugin2.sock") - require.NoError(t, p2.Serve(socketPath2)) - require.False(t, waitForPluginRegistrationStatus(t, p2.registrationStatus)) - - // Restarts plugin watcher should traverse the socket directory and issues a - // callback for every existing socket. - require.NoError(t, w.Stop()) - require.NoError(t, h.Cleanup()) - require.NoError(t, w.Start()) - - var wg sync.WaitGroup - wg.Add(2) - var pStatus string - var p2Status string - go func() { - pStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, p.registrationStatus)) - wg.Done() - }() - go func() { - p2Status = strconv.FormatBool(waitForPluginRegistrationStatus(t, p2.registrationStatus)) - wg.Done() - }() - - if waitTimeout(&wg, 2*time.Second) { - t.Fatalf("Timed out waiting for wait group") - } - - expectedSet := sets.NewString() - expectedSet.Insert("true", "false") - actualSet := sets.NewString() - actualSet.Insert(pStatus, p2Status) - - require.Equal(t, expectedSet, actualSet) - - select { - case err := <-h.chanForHandlerAckErrors: - t.Fatalf("%v", err) + return case <-time.After(2 * time.Second): + t.Fatalf("Timeout while waiting for the plugin registration status") } - - require.NoError(t, w.Stop()) - require.NoError(t, w.Cleanup()) } -func TestPluginWithSubDir(t *testing.T) { - rootDir, err := ioutil.TempDir("", "plugin_test") - require.NoError(t, err) +func TestPluginRegistrationFailureWithUnsupportedVersion(t *testing.T) { + defer cleanup(t) - w := NewWatcher(rootDir) - hcsi := NewExampleHandler() - hdp := NewExampleHandler() + pluginName := fmt.Sprintf("example-plugin") + socketPath := socketDir + "/plugin.sock" - w.AddHandler(registerapi.CSIPlugin, hcsi.Handler) - w.AddHandler(registerapi.DevicePlugin, hdp.Handler) + hdlr := NewExampleHandler(supportedVersions) + hdlr.AddPluginName(pluginName) - err = w.fs.MkdirAll(filepath.Join(rootDir, registerapi.DevicePlugin), 0755) - require.NoError(t, err) - err = w.fs.MkdirAll(filepath.Join(rootDir, registerapi.CSIPlugin), 0755) - require.NoError(t, err) + w := newWatcherWithHandler(t, hdlr) + defer func() { require.NoError(t, w.Stop()) }() - dpSocketPath := filepath.Join(rootDir, registerapi.DevicePlugin, "plugin.sock") - csiSocketPath := filepath.Join(rootDir, registerapi.CSIPlugin, "plugin.sock") + // Advertise v1beta3 but don't serve anything else than the plugin service + p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3") + require.NoError(t, p.Serve()) + defer func() { require.NoError(t, p.Stop()) }() - require.NoError(t, w.Start()) - - // two plugins using the same name but with different type - dp := NewTestExamplePlugin("exampleplugin", registerapi.DevicePlugin, "example-endpoint") - require.NoError(t, dp.Serve(dpSocketPath)) - require.True(t, waitForPluginRegistrationStatus(t, dp.registrationStatus)) - - csi := NewTestExamplePlugin("exampleplugin", registerapi.CSIPlugin, "example-endpoint") - require.NoError(t, csi.Serve(csiSocketPath)) - require.True(t, waitForPluginRegistrationStatus(t, csi.registrationStatus)) - - // Restarts plugin watcher should traverse the socket directory and issues a - // callback for every existing socket. - require.NoError(t, w.Stop()) - require.NoError(t, hcsi.Cleanup()) - require.NoError(t, hdp.Cleanup()) - require.NoError(t, w.Start()) - - var wg sync.WaitGroup - wg.Add(2) - var dpStatus string - var csiStatus string - go func() { - dpStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, dp.registrationStatus)) - wg.Done() - }() - go func() { - csiStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, csi.registrationStatus)) - wg.Done() - }() - - if waitTimeout(&wg, 4*time.Second) { - require.NoError(t, errors.New("Timed out waiting for wait group")) - } - - expectedSet := sets.NewString() - expectedSet.Insert("true", "true") - actualSet := sets.NewString() - actualSet.Insert(dpStatus, csiStatus) - - require.Equal(t, expectedSet, actualSet) - - select { - case err := <-hcsi.chanForHandlerAckErrors: - t.Fatalf("%v", err) - case err := <-hdp.chanForHandlerAckErrors: - t.Fatalf("%v", err) - case <-time.After(4 * time.Second): - } - - require.NoError(t, w.Stop()) - require.NoError(t, w.Cleanup()) + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) } -func waitForPluginRegistrationStatus(t *testing.T, statusCh chan registerapi.RegistrationStatus) bool { +func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing.T) { + defer cleanup(t) + + pluginName := fmt.Sprintf("example-plugin") + socketPath := socketDir + "/plugin.sock" + + // Advertise v1beta3 but don't serve anything else than the plugin service + p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3") + require.NoError(t, p.Serve()) + defer func() { require.NoError(t, p.Stop()) }() + + hdlr := NewExampleHandler(supportedVersions) + hdlr.AddPluginName(pluginName) + + w := newWatcherWithHandler(t, hdlr) + defer func() { require.NoError(t, w.Stop()) }() + + require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName))) + require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) +} + +func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool { select { - case status := <-statusCh: + case status := <-statusChan: return status.PluginRegistered case <-time.After(10 * time.Second): t.Fatalf("Timed out while waiting for registration status") } return false } + +func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan examplePluginEvent) bool { + select { + case event := <-eventChan: + return event == expected + case <-time.After(2 * time.Second): + t.Fatalf("Timed out while waiting for registration status %v", expected) + } + + return false +} + +func newWatcherWithHandler(t *testing.T, hdlr PluginHandler) *Watcher { + w := NewWatcher(socketDir) + + w.AddHandler(registerapi.DevicePlugin, hdlr) + require.NoError(t, w.Start()) + + return w +}