From 3a2e3bcc70ef9810b871db8bcf0599c4712f6bc2 Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Tue, 1 May 2018 02:15:06 -0400 Subject: [PATCH] Add probe based mechanism for kubelet plugin discovery --- hack/.golint_failures | 4 + ...-kubelet-plugin-registration-dockerized.sh | 29 ++ ...e-generated-kubelet-plugin-registration.sh | 27 ++ ...y-generated-kubelet-plugin-registration.sh | 39 +++ pkg/kubelet/BUILD | 1 + pkg/kubelet/apis/BUILD | 1 + .../apis/pluginregistration/v1alpha1/BUILD | 40 +++ .../pluginregistration/v1alpha1/api.proto | 60 ++++ .../pluginregistration/v1alpha1/constants.go | 22 ++ pkg/kubelet/kubelet.go | 12 + pkg/kubelet/util/BUILD | 2 + pkg/kubelet/util/pluginwatcher/BUILD | 58 ++++ pkg/kubelet/util/pluginwatcher/README | 29 ++ .../util/pluginwatcher/example_plugin.go | 150 ++++++++++ .../example_plugin_apis/v1beta1/BUILD | 34 +++ .../example_plugin_apis/v1beta1/api.proto | 28 ++ .../example_plugin_apis/v1beta2/BUILD | 34 +++ .../example_plugin_apis/v1beta2/api.proto | 29 ++ .../util/pluginwatcher/plugin_watcher.go | 260 ++++++++++++++++++ .../util/pluginwatcher/plugin_watcher_test.go | 220 +++++++++++++++ 20 files changed, 1079 insertions(+) create mode 100755 hack/update-generated-kubelet-plugin-registration-dockerized.sh create mode 100755 hack/update-generated-kubelet-plugin-registration.sh create mode 100755 hack/verify-generated-kubelet-plugin-registration.sh create mode 100644 pkg/kubelet/apis/pluginregistration/v1alpha1/BUILD create mode 100644 pkg/kubelet/apis/pluginregistration/v1alpha1/api.proto create mode 100644 pkg/kubelet/apis/pluginregistration/v1alpha1/constants.go create mode 100644 pkg/kubelet/util/pluginwatcher/BUILD create mode 100644 pkg/kubelet/util/pluginwatcher/README create mode 100644 pkg/kubelet/util/pluginwatcher/example_plugin.go create mode 100644 pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/BUILD create mode 100644 pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/api.proto create mode 100644 pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/BUILD create mode 100644 pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/api.proto create mode 100644 pkg/kubelet/util/pluginwatcher/plugin_watcher.go create mode 100644 pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go diff --git a/hack/.golint_failures b/hack/.golint_failures index 2b2798e46a..0853347106 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -166,6 +166,7 @@ pkg/kubelet/apis/deviceplugin/v1alpha pkg/kubelet/apis/deviceplugin/v1beta1 pkg/kubelet/apis/kubeletconfig pkg/kubelet/apis/kubeletconfig/v1beta1 +pkg/kubelet/apis/pluginregistration/v1alpha1 pkg/kubelet/cadvisor pkg/kubelet/cadvisor/testing pkg/kubelet/checkpoint @@ -217,6 +218,9 @@ pkg/kubelet/sysctl pkg/kubelet/types pkg/kubelet/util pkg/kubelet/util/cache +pkg/kubelet/util/pluginwatcher +pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1 +pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2 pkg/kubelet/util/queue pkg/kubelet/util/sliceutils pkg/kubemark diff --git a/hack/update-generated-kubelet-plugin-registration-dockerized.sh b/hack/update-generated-kubelet-plugin-registration-dockerized.sh new file mode 100755 index 0000000000..daf5abbd36 --- /dev/null +++ b/hack/update-generated-kubelet-plugin-registration-dockerized.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE}")/../" && pwd -P)" +KUBELET_PLUGIN_REGISTRATION_ROOT="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1alpha1/" +KUBELET_EXAMPLE_PLUGIN_V1BETA1="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/" +KUBELET_EXAMPLE_PLUGIN_V1BETA2="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/" + +source "${KUBE_ROOT}/hack/lib/protoc.sh" +kube::protoc::generate_proto ${KUBELET_PLUGIN_REGISTRATION_ROOT} +kube::protoc::generate_proto ${KUBELET_EXAMPLE_PLUGIN_V1BETA1} +kube::protoc::generate_proto ${KUBELET_EXAMPLE_PLUGIN_V1BETA2} diff --git a/hack/update-generated-kubelet-plugin-registration.sh b/hack/update-generated-kubelet-plugin-registration.sh new file mode 100755 index 0000000000..308733c024 --- /dev/null +++ b/hack/update-generated-kubelet-plugin-registration.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. + +# NOTE: All output from this script needs to be copied back to the calling +# source tree. This is managed in kube::build::copy_output in build/common.sh. +# If the output set is changed update that function. + +${KUBE_ROOT}/build/run.sh hack/update-generated-kubelet-plugin-registration-dockerized.sh "$@" diff --git a/hack/verify-generated-kubelet-plugin-registration.sh b/hack/verify-generated-kubelet-plugin-registration.sh new file mode 100755 index 0000000000..3dfffa8dcc --- /dev/null +++ b/hack/verify-generated-kubelet-plugin-registration.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. +ERROR="Kubelet Plugin Registration api is out of date. Please run hack/update-generated-kubelet-plugin-registration.sh" +KUBELET_PLUGIN_REGISTRATION_ROOT="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1alpha1/" + +source "${KUBE_ROOT}/hack/lib/protoc.sh" +kube::golang::setup_env + +function cleanup { + rm -rf ${KUBELET_PLUGIN_REGISTRATION_ROOT}/_tmp/ +} + +trap cleanup EXIT + +mkdir -p ${KUBELET_PLUGIN_REGISTRATION_ROOT}/_tmp +cp ${KUBELET_PLUGIN_REGISTRATION_ROOT}/api.pb.go ${KUBELET_PLUGIN_REGISTRATION_ROOT}/_tmp/ + +KUBE_VERBOSE=3 "${KUBE_ROOT}/hack/update-generated-kubelet-plugin-registration.sh" +kube::protoc::diff "${KUBELET_PLUGIN_REGISTRATION_ROOT}/api.pb.go" "${KUBELET_PLUGIN_REGISTRATION_ROOT}/_tmp/api.pb.go" ${ERROR} +echo "Generated Kubelet Plugin Registration api is up to date." diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index b4d6d166fa..710af32b7a 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -84,6 +84,7 @@ go_library( "//pkg/kubelet/util:go_default_library", "//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/util/manager:go_default_library", + "//pkg/kubelet/util/pluginwatcher:go_default_library", "//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/kubelet/volumemanager:go_default_library", diff --git a/pkg/kubelet/apis/BUILD b/pkg/kubelet/apis/BUILD index 47cb8184cc..2a22e48121 100644 --- a/pkg/kubelet/apis/BUILD +++ b/pkg/kubelet/apis/BUILD @@ -41,6 +41,7 @@ filegroup( "//pkg/kubelet/apis/deviceplugin/v1alpha:all-srcs", "//pkg/kubelet/apis/deviceplugin/v1beta1:all-srcs", "//pkg/kubelet/apis/kubeletconfig:all-srcs", + "//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs", "//pkg/kubelet/apis/stats/v1alpha1:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/apis/pluginregistration/v1alpha1/BUILD b/pkg/kubelet/apis/pluginregistration/v1alpha1/BUILD new file mode 100644 index 0000000000..f51668500b --- /dev/null +++ b/pkg/kubelet/apis/pluginregistration/v1alpha1/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "api.pb.go", + "constants.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1", + deps = [ + "//vendor/github.com/gogo/protobuf/gogoproto:go_default_library", + "//vendor/github.com/gogo/protobuf/proto:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) + +filegroup( + name = "go_default_library_protos", + srcs = ["api.proto"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/apis/pluginregistration/v1alpha1/api.proto b/pkg/kubelet/apis/pluginregistration/v1alpha1/api.proto new file mode 100644 index 0000000000..319b3f19fb --- /dev/null +++ b/pkg/kubelet/apis/pluginregistration/v1alpha1/api.proto @@ -0,0 +1,60 @@ +// To regenerate api.pb.go run hack/update-generated-kubelet-plugin-registration.sh +syntax = 'proto3'; + +package pluginregistration; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.goproto_getters_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_unrecognized_all) = false; + +// PluginInfo is the message sent from a plugin to the Kubelet pluginwatcher for plugin registration +message PluginInfo { + // Type of the Plugin. CSIPlugin or DevicePlugin + string type = 1; + // Plugin name that uniquely identifies the plugin for the given plugin type. + // For DevicePlugin, this is the resource name that the plugin manages and + // should follow the extended resource name convention. + // For CSI, this is the CSI driver registrar name. + string name = 2; + // Optional endpoint location. If found set by Kubelet component, + // Kubelet component will use this endpoint for specific requests. + // This allows the plugin to register using one endpoint and possibly use + // a different socket for control operations. CSI uses this model to delegate + // its registration external from the plugin. + string endpoint = 3; + // Plugin service API versions the plugin supports. + // For DevicePlugin, this maps to the deviceplugin API versions the + // plugin supports at the given socket. + // The Kubelet component communicating with the plugin should be able + // to choose any preferred version from this list, or returns an error + // if none of the listed versions is supported. + repeated string supported_versions = 4; +} + +// RegistrationStatus is the message sent from Kubelet pluginwatcher to the plugin for notification on registration status +message RegistrationStatus { + // True if plugin gets registered successfully at Kubelet + bool plugin_registered = 1; + // Error message in case plugin fails to register, empty string otherwise + string error = 2; +} + +// RegistrationStatusResponse is sent by plugin to kubelet in response to RegistrationStatus RPC +message RegistrationStatusResponse { +} + +// InfoRequest is the empty request message from Kubelet +message InfoRequest { +} + +// Registration is the service advertised by the Plugins. +service Registration { + rpc GetInfo(InfoRequest) returns (PluginInfo) {} + rpc NotifyRegistrationStatus(RegistrationStatus) returns (RegistrationStatusResponse) {} +} diff --git a/pkg/kubelet/apis/pluginregistration/v1alpha1/constants.go b/pkg/kubelet/apis/pluginregistration/v1alpha1/constants.go new file mode 100644 index 0000000000..cfc1b7c6d7 --- /dev/null +++ b/pkg/kubelet/apis/pluginregistration/v1alpha1/constants.go @@ -0,0 +1,22 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pluginregistration + +const ( + CSIPlugin = "CSIPlugin" + DevicePlugin = "DevicePlugin" +) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 421c0c98be..9194c1bbb9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -93,6 +93,7 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/manager" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/volumemanager" @@ -775,6 +776,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if err != nil { return nil, err } + klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsDir()) // If the experimentalMounterPathFlag is set, we do not want to // check node capabilities since the mount path is not the default @@ -1150,6 +1152,11 @@ type Kubelet struct { // This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node. // This can be useful for debugging volume related issues. keepTerminatedPodVolumes bool // DEPRECATED + + // pluginwatcher is a utility for Kubelet to register different types of node-level plugins + // such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the + // directory returned by kubelet.getPluginsDir() + pluginWatcher pluginwatcher.Watcher } func allGlobalUnicastIPs() ([]net.IP, error) { @@ -1264,6 +1271,11 @@ func (kl *Kubelet) initializeModules() error { } } + // Start the plugin watcher + if err := kl.pluginWatcher.Start(); err != nil { + return fmt.Errorf("failed to start Plugin Watcher. err: %v", err) + } + // Start the image manager. kl.imageManager.Start() diff --git a/pkg/kubelet/util/BUILD b/pkg/kubelet/util/BUILD index df02ebdcd2..ff1755ebd1 100644 --- a/pkg/kubelet/util/BUILD +++ b/pkg/kubelet/util/BUILD @@ -93,9 +93,11 @@ filegroup( "//pkg/kubelet/util/format:all-srcs", "//pkg/kubelet/util/ioutils:all-srcs", "//pkg/kubelet/util/manager:all-srcs", + "//pkg/kubelet/util/pluginwatcher:all-srcs", "//pkg/kubelet/util/queue:all-srcs", "//pkg/kubelet/util/sliceutils:all-srcs", "//pkg/kubelet/util/store:all-srcs", ], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/kubelet/util/pluginwatcher/BUILD b/pkg/kubelet/util/pluginwatcher/BUILD new file mode 100644 index 0000000000..b4173ab5e1 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/BUILD @@ -0,0 +1,58 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "example_plugin.go", + "plugin_watcher.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher", + deps = [ + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:go_default_library", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2:go_default_library", + "//pkg/util/filesystem:go_default_library", + "//vendor/github.com/fsnotify/fsnotify:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:all-srcs", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["plugin_watcher_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:go_default_library", + "//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + ], +) diff --git a/pkg/kubelet/util/pluginwatcher/README b/pkg/kubelet/util/pluginwatcher/README new file mode 100644 index 0000000000..9654b2cf62 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/README @@ -0,0 +1,29 @@ +This folder contains a utility, pluginwatcher, for Kubelet to register +different types of node-level plugins such as device plugins or CSI plugins. +It discovers plugins by monitoring inotify events under the directory returned by +kubelet.getPluginsDir(). Lets refer this directory as PluginsSockDir. +For any discovered plugin, pluginwatcher issues Registration.GetInfo grpc call +to get plugin type, name and supported service API versions. For any registered plugin type, +pluginwatcher calls the registered callback function with the received plugin +name, supported service API versions, and the full socket path. The Kubelet +component that receives this callback can acknowledge or reject the plugin +according to its own logic, and use the socket path to establish its service +communication with any API version supported by the plugin. + +Here are the general rules that Kubelet plugin developers should follow: +- Run as 'root' user. Currently creating socket under PluginsSockDir, a root owned directory, requires + plugin process to be running as 'root'. +- Implements the Registration service specified in + pkg/kubelet/apis/pluginregistration/v*/api.proto. +- The plugin name sent during Registration.GetInfo grpc should be unique + for the given plugin type (CSIPlugin or DevicePlugin). +- The socket path needs to be unique and doesn't conflict with the path chosen + by any other potential plugins. Currently we only support flat fs namespace + under PluginsSockDir but will soon support recursive inotify watch for + hierarchical socket paths. +- A plugin should clean up its own socket upon exiting or when a new instance + comes up. A plugin should NOT remove any sockets belonging to other plugins. +- A plugin should make sure it has service ready for any supported service API + version listed in the PluginInfo. +- For an example plugin implementation, take a look at example_plugin.go + included in this directory. diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin.go b/pkg/kubelet/util/pluginwatcher/example_plugin.go new file mode 100644 index 0000000000..fbca43acad --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/example_plugin.go @@ -0,0 +1,150 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pluginwatcher + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" + v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1" + v1beta2 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2" +) + +const ( + PluginName = "example-plugin" + PluginType = "example-plugin-type" +) + +// examplePlugin is a sample plugin to work with plugin watcher +type examplePlugin struct { + grpcServer *grpc.Server + wg sync.WaitGroup + registrationStatus chan registerapi.RegistrationStatus // for testing + endpoint string // for testing +} + +type pluginServiceV1Beta1 struct { + server *examplePlugin +} + +func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) { + glog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field) + return &v1beta1.ExampleResponse{}, nil +} + +func (s *pluginServiceV1Beta1) RegisterService() { + v1beta1.RegisterExampleServer(s.server.grpcServer, s) +} + +type pluginServiceV1Beta2 struct { + server *examplePlugin +} + +func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) { + glog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field) + return &v1beta2.ExampleResponse{}, nil +} + +func (s *pluginServiceV1Beta2) RegisterService() { + v1beta2.RegisterExampleServer(s.server.grpcServer, s) +} + +// NewExamplePlugin returns an initialized examplePlugin instance +func NewExamplePlugin() *examplePlugin { + return &examplePlugin{} +} + +// NewTestExamplePlugin returns an initialized examplePlugin instance for testing +func NewTestExamplePlugin(endpoint string) *examplePlugin { + return &examplePlugin{ + registrationStatus: make(chan registerapi.RegistrationStatus), + endpoint: endpoint, + } +} + +// GetInfo is the RPC invoked by plugin watcher +func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) { + return ®isterapi.PluginInfo{ + Type: PluginType, + Name: PluginName, + Endpoint: e.endpoint, + SupportedVersions: []string{"v1beta1", "v1beta2"}, + }, nil +} + +func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { + if e.registrationStatus != nil { + e.registrationStatus <- *status + } + if !status.PluginRegistered { + glog.Errorf("Registration failed: %s\n", status.Error) + } + return ®isterapi.RegistrationStatusResponse{}, nil +} + +// Serve starts example plugin grpc server +func (e *examplePlugin) Serve(socketPath string) error { + glog.Infof("starting example server at: %s\n", socketPath) + lis, err := net.Listen("unix", socketPath) + if err != nil { + return err + } + glog.Infof("example server started at: %s\n", socketPath) + e.grpcServer = grpc.NewServer() + // Registers kubelet plugin watcher api. + registerapi.RegisterRegistrationServer(e.grpcServer, e) + // Registers services for both v1beta1 and v1beta2 versions. + v1beta1 := &pluginServiceV1Beta1{server: e} + v1beta1.RegisterService() + v1beta2 := &pluginServiceV1Beta2{server: e} + v1beta2.RegisterService() + + // Starts service + e.wg.Add(1) + go func() { + defer e.wg.Done() + // Blocking call to accept incoming connections. + if err := e.grpcServer.Serve(lis); err != nil { + glog.Errorf("example server stopped serving: %v", err) + } + }() + return nil +} + +func (e *examplePlugin) Stop() error { + glog.Infof("Stopping example server\n") + e.grpcServer.Stop() + c := make(chan struct{}) + go func() { + defer close(c) + e.wg.Wait() + }() + select { + case <-c: + return nil + case <-time.After(time.Second): + glog.Errorf("Timed out on waiting for stop completion") + return fmt.Errorf("Timed out on waiting for stop completion") + } +} diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/BUILD b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/BUILD new file mode 100644 index 0000000000..affbd0aee4 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/BUILD @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +filegroup( + name = "go_default_library_protos", + srcs = ["api.proto"], + visibility = ["//visibility:public"], +) + +go_library( + name = "go_default_library", + srcs = ["api.pb.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/gogo/protobuf/gogoproto:go_default_library", + "//vendor/github.com/gogo/protobuf/proto:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/api.proto b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/api.proto new file mode 100644 index 0000000000..14aa7df2c4 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/api.proto @@ -0,0 +1,28 @@ +syntax = 'proto3'; + +package v1beta1; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.goproto_getters_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_unrecognized_all) = false; + +message ExampleRequest { + string request = 1; + string v1beta1_field = 2; +} + +message ExampleResponse { + string error = 1; +} + +// Example is a simple example service for general reference on the recommended +// kubelet plugin model and plugin watcher testing. +service Example { + rpc GetExampleInfo(ExampleRequest) returns (ExampleResponse) {} +} diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/BUILD b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/BUILD new file mode 100644 index 0000000000..f2b53898d3 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/BUILD @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +filegroup( + name = "go_default_library_protos", + srcs = ["api.proto"], + visibility = ["//visibility:public"], +) + +go_library( + name = "go_default_library", + srcs = ["api.pb.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/gogo/protobuf/gogoproto:go_default_library", + "//vendor/github.com/gogo/protobuf/proto:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/api.proto b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/api.proto new file mode 100644 index 0000000000..e34697f3a6 --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/api.proto @@ -0,0 +1,29 @@ +syntax = 'proto3'; + +package v1beta2; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.goproto_getters_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_unrecognized_all) = false; + +// Renames a field from v1beta1 ExampleRequest. +message ExampleRequest { + string request = 1; + string v1beta2_field = 2; +} + +message ExampleResponse { + string error = 1; +} + +// Example is a simple example service for general reference on the recommended +// kubelet plugin model and plugin watcher testing. +service Example { + rpc GetExampleInfo(ExampleRequest) returns (ExampleResponse) {} +} diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go new file mode 100644 index 0000000000..9a5241cb2e --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go @@ -0,0 +1,260 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pluginwatcher + +import ( + "fmt" + "net" + "os" + "path" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" +) + +// RegisterCallbackFn is the type of the callback function that handlers will provide +type RegisterCallbackFn func(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) + +// Watcher is the plugin watcher +type Watcher struct { + path string + handlers map[string]RegisterCallbackFn + stopCh chan interface{} + fs utilfs.Filesystem + watcher *fsnotify.Watcher + wg sync.WaitGroup + mutex sync.Mutex +} + +// NewWatcher provides a new watcher +func NewWatcher(sockDir string) Watcher { + return Watcher{ + path: sockDir, + handlers: make(map[string]RegisterCallbackFn), + fs: &utilfs.DefaultFs{}, + } +} + +// AddHandler registers a callback to be invoked for a particular type of plugin +func (w *Watcher) AddHandler(handlerType string, handlerCbkFn RegisterCallbackFn) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.handlers[handlerType] = handlerCbkFn +} + +// Creates the plugin directory, if it doesn't already exist. +func (w *Watcher) createPluginDir() error { + glog.V(4).Infof("Ensuring Plugin directory at %s ", w.path) + if err := w.fs.MkdirAll(w.path, 0755); err != nil { + return fmt.Errorf("error (re-)creating driver directory: %s", err) + } + return nil +} + +// Walks through the plugin directory to discover any existing plugin sockets. +func (w *Watcher) traversePluginDir() error { + files, err := w.fs.ReadDir(w.path) + if err != nil { + return fmt.Errorf("error reading the plugin directory: %v", err) + } + for _, f := range files { + // Currently only supports flat fs namespace under the plugin directory. + // TODO: adds support for hierarchical fs namespace. + if !f.IsDir() && filepath.Base(f.Name())[0] != '.' { + go func(sockName string) { + w.watcher.Events <- fsnotify.Event{ + Name: sockName, + Op: fsnotify.Op(uint32(1)), + } + }(path.Join(w.path, f.Name())) + } + } + return nil +} + +func (w *Watcher) init() error { + if err := w.createPluginDir(); err != nil { + return err + } + return nil +} + +func (w *Watcher) registerPlugin(socketPath string) error { + //TODO: Implement rate limiting to mitigate any DOS kind of attacks. + glog.V(4).Infof("registerPlugin called for socketPath: %s", socketPath) + client, conn, err := dial(socketPath) + if err != nil { + return fmt.Errorf("dial failed at socket %s, err: %v", socketPath, err) + } + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{}) + if err != nil { + return fmt.Errorf("failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err) + } + if err := w.invokeRegistrationCallbackAtHandler(ctx, client, infoResp, socketPath); err != nil { + return fmt.Errorf("failed to register plugin. Callback handler returned err: %v", err) + } + glog.V(4).Infof("Successfully registered plugin for plugin type: %s, name: %s, socket: %s", infoResp.Type, infoResp.Name, socketPath) + return nil +} + +func (w *Watcher) invokeRegistrationCallbackAtHandler(ctx context.Context, client registerapi.RegistrationClient, infoResp *registerapi.PluginInfo, socketPath string) error { + var handlerCbkFn RegisterCallbackFn + var ok bool + handlerCbkFn, ok = w.handlers[infoResp.Type] + if !ok { + if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ + PluginRegistered: false, + Error: fmt.Sprintf("No handler found registered for plugin type: %s, socket: %s", infoResp.Type, socketPath), + }); err != nil { + glog.Errorf("Failed to send registration status at socket %s, err: %v", socketPath, err) + } + return fmt.Errorf("no handler found registered for plugin type: %s, socket: %s", infoResp.Type, socketPath) + } + + var versions []string + for _, version := range infoResp.SupportedVersions { + versions = append(versions, version) + } + // calls handler callback to verify registration request + err, chanForAckOfNotification := handlerCbkFn(infoResp.Name, infoResp.Endpoint, versions, socketPath) + if err != nil { + if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ + PluginRegistered: false, + Error: fmt.Sprintf("Plugin registration failed with err: %v", err), + }); err != nil { + glog.Errorf("Failed to send registration status at socket %s, err: %v", socketPath, err) + } + chanForAckOfNotification <- false + return fmt.Errorf("plugin registration failed with err: %v", err) + } + + if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ + PluginRegistered: true, + }); err != nil { + return fmt.Errorf("failed to send registration status at socket %s, err: %v", socketPath, err) + } + chanForAckOfNotification <- true + return nil +} + +// Start watches for the creation of plugin sockets at the path +func (w *Watcher) Start() error { + glog.V(2).Infof("Plugin Watcher Start at %s", w.path) + w.stopCh = make(chan interface{}) + + // Creating the directory to be watched if it doesn't exist yet, + // and walks through the directory to discover the existing plugins. + if err := w.init(); err != nil { + return err + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to start plugin watcher, err: %v", err) + } + + if err := watcher.Add(w.path); err != nil { + watcher.Close() + return fmt.Errorf("failed to start plugin watcher, err: %v", err) + } + + w.watcher = watcher + + if err := w.traversePluginDir(); err != nil { + watcher.Close() + return fmt.Errorf("failed to traverse plugin socket path, err: %v", err) + } + + w.wg.Add(1) + go func(watcher *fsnotify.Watcher) { + defer w.wg.Done() + for { + select { + case event := <-watcher.Events: + if event.Op&fsnotify.Create == fsnotify.Create { + go func(eventName string) { + err := w.registerPlugin(eventName) + if err != nil { + glog.Errorf("Plugin %s registration failed with error: %v", eventName, err) + } + }(event.Name) + } + continue + case err := <-watcher.Errors: + //TODO: Handle errors by taking corrective measures + if err != nil { + glog.Errorf("Watcher received error: %v", err) + } + continue + + case <-w.stopCh: + watcher.Close() + break + } + break + } + }(watcher) + return nil +} + +// Stop stops probing the creation of plugin sockets at the path +func (w *Watcher) Stop() error { + close(w.stopCh) + c := make(chan struct{}) + go func() { + defer close(c) + w.wg.Wait() + }() + select { + case <-c: + case <-time.After(10 * time.Second): + return fmt.Errorf("timeout on stopping watcher") + } + return nil +} + +// Cleanup cleans the path by removing sockets +func (w *Watcher) Cleanup() error { + return os.RemoveAll(w.path) +} + +// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial +func dial(unixSocketPath string) (registerapi.RegistrationClient, *grpc.ClientConn, error) { + c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithTimeout(10*time.Second), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + + if err != nil { + return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err) + } + + return registerapi.NewRegistrationClient(c), c, nil +} diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go new file mode 100644 index 0000000000..44bccf9a6f --- /dev/null +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go @@ -0,0 +1,220 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pluginwatcher + +import ( + "fmt" + "io/ioutil" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + + "k8s.io/apimachinery/pkg/util/sets" + registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" + v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1" + v1beta2 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2" +) + +func TestExamplePlugin(t *testing.T) { + socketDir, err := ioutil.TempDir("", "plugin_test") + require.NoError(t, err) + socketPath := socketDir + "/plugin.sock" + w := NewWatcher(socketDir) + + testCases := []struct { + description string + expectedEndpoint string + returnErr error + }{ + { + description: "Successfully register plugin through inotify", + expectedEndpoint: "", + returnErr: nil, + }, + { + description: "Successfully register plugin through inotify and got expected optional endpoint", + expectedEndpoint: "dummyEndpoint", + returnErr: nil, + }, + { + description: "Fails registration because endpoint is expected to be non-empty", + expectedEndpoint: "dummyEndpoint", + returnErr: fmt.Errorf("empty endpoint received"), + }, + { + description: "Successfully register plugin through inotify after plugin restarts", + expectedEndpoint: "", + returnErr: nil, + }, + { + description: "Fails registration with conflicting plugin name", + expectedEndpoint: "", + returnErr: fmt.Errorf("conflicting plugin name"), + }, + { + description: "Successfully register plugin during initial traverse after plugin watcher restarts", + expectedEndpoint: "", + returnErr: nil, + }, + { + description: "Fails registration with conflicting plugin name during initial traverse after plugin watcher restarts", + expectedEndpoint: "", + returnErr: fmt.Errorf("conflicting plugin name"), + }, + } + + callbackCount := struct { + mutex sync.Mutex + count int32 + }{} + w.AddHandler(PluginType, func(name string, endpoint string, versions []string, sockPath string) (error, chan bool) { + callbackCount.mutex.Lock() + localCount := callbackCount.count + callbackCount.count = callbackCount.count + 1 + callbackCount.mutex.Unlock() + + require.True(t, localCount <= int32((len(testCases)-1))) + require.Equal(t, PluginName, name, "Plugin name mismatched!!") + retError := testCases[localCount].returnErr + if retError == nil || retError.Error() != "empty endpoint received" { + require.Equal(t, testCases[localCount].expectedEndpoint, endpoint, "Unexpected endpoint") + } else { + require.NotEqual(t, testCases[localCount].expectedEndpoint, endpoint, "Unexpected endpoint") + } + + require.Equal(t, []string{"v1beta1", "v1beta2"}, versions, "Plugin version mismatched!!") + // Verifies the grpcServer is ready to serve services. + _, conn, err := dial(sockPath) + require.Nil(t, err) + defer conn.Close() + + // The plugin handler should be able to use any listed service API version. + v1beta1Client := v1beta1.NewExampleClient(conn) + v1beta2Client := v1beta2.NewExampleClient(conn) + + // Tests v1beta1 GetExampleInfo + _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{}) + require.Nil(t, err) + + // Tests v1beta1 GetExampleInfo + _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{}) + //atomic.AddInt32(&callbackCount, 1) + chanForAckOfNotification := make(chan bool) + + go func() { + select { + case <-chanForAckOfNotification: + close(chanForAckOfNotification) + case <-time.After(time.Second): + t.Fatalf("Timed out while waiting for notification ack") + } + }() + return retError, chanForAckOfNotification + }) + require.NoError(t, w.Start()) + + p := NewTestExamplePlugin("") + require.NoError(t, p.Serve(socketPath)) + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + require.NoError(t, p.Stop()) + + p = NewTestExamplePlugin("dummyEndpoint") + require.NoError(t, p.Serve(socketPath)) + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + require.NoError(t, p.Stop()) + + p = NewTestExamplePlugin("") + require.NoError(t, p.Serve(socketPath)) + require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + // Trying to start a plugin service at the same socket path should fail + // with "bind: address already in use" + require.NotNil(t, p.Serve(socketPath)) + + // grpcServer.Stop() will remove the socket and starting plugin service + // at the same path again should succeeds and trigger another callback. + require.NoError(t, p.Stop()) + p = NewTestExamplePlugin("") + go func() { + require.Nil(t, p.Serve(socketPath)) + }() + require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus)) + + // Starting another plugin with the same name got verification error. + p2 := NewTestExamplePlugin("") + socketPath2 := socketDir + "/plugin2.sock" + go func() { + require.NoError(t, p2.Serve(socketPath2)) + }() + require.False(t, waitForPluginRegistrationStatus(t, p2.registrationStatus)) + + // Restarts plugin watcher should traverse the socket directory and issues a + // callback for every existing socket. + require.NoError(t, w.Stop()) + errCh := make(chan error) + go func() { + errCh <- w.Start() + }() + + var wg sync.WaitGroup + wg.Add(2) + var pStatus string + var p2Status string + go func() { + pStatus = strconv.FormatBool(waitForPluginRegistrationStatus(t, p.registrationStatus)) + wg.Done() + }() + go func() { + p2Status = strconv.FormatBool(waitForPluginRegistrationStatus(t, p2.registrationStatus)) + wg.Done() + }() + wg.Wait() + expectedSet := sets.NewString() + expectedSet.Insert("true", "false") + actualSet := sets.NewString() + actualSet.Insert(pStatus, p2Status) + + require.Equal(t, expectedSet, actualSet) + + select { + case err = <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatalf("Timed out while waiting for watcher start") + + } + + require.NoError(t, w.Stop()) + err = w.Cleanup() + require.NoError(t, err) +} + +func waitForPluginRegistrationStatus(t *testing.T, statusCh chan registerapi.RegistrationStatus) bool { + select { + case status := <-statusCh: + return status.PluginRegistered + case <-time.After(10 * time.Second): + t.Fatalf("Timed out while waiting for registration status") + } + return false +}