From 78b55eb5bf343d1e7b405ca4c4d0d1fc62c5f0e1 Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Sun, 12 Aug 2018 00:21:25 +0200 Subject: [PATCH] Updated the CSI pluginwatcher handler --- pkg/kubelet/kubelet.go | 4 ++-- pkg/volume/csi/csi_plugin.go | 41 +++++++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 506f409f72..88f3de5c32 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1194,7 +1194,7 @@ type Kubelet struct { // pluginwatcher is a utility for Kubelet to register different types of node-level plugins // such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the // directory returned by kubelet.getPluginsDir() - pluginWatcher pluginwatcher.Watcher + pluginWatcher *pluginwatcher.Watcher // This flag sets a maximum number of images to report in the node status. nodeStatusMaxImages int32 @@ -1365,7 +1365,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.containerLogManager.Start() if kl.enablePluginsWatcher { // Adding Registration Callback function for CSI Driver - kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) + kl.pluginWatcher.AddHandler("CSIPlugin", pluginwatcher.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for Device Manager kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback()) // Start the plugin watcher diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 258362192d..8eefb4e69d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -28,6 +28,7 @@ import ( "context" "github.com/golang/glog" + api "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -89,6 +90,10 @@ type csiDriversStore struct { sync.RWMutex } +// RegistrationHandler is the handler which is fed to the pluginwatcher API. +type RegistrationHandler struct { +} + // TODO (verult) consider using a struct instead of global variables // csiDrivers map keep track of all registered CSI drivers on the node and their // corresponding sockets @@ -96,21 +101,28 @@ var csiDrivers csiDriversStore var nodeUpdater nodeupdater.Interface -// RegistrationCallback is called by kubelet's plugin watcher upon detection +// PluginHandler is the plugin registration handler interface passed to the +// pluginwatcher module in kubelet +var PluginHandler = &RegistrationHandler{} + +// ValidatePlugin is called by kubelet's plugin watcher upon detection // of a new registration socket opened by CSI Driver registrar side car. -func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (chan bool, error) { +func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + glog.Infof(log("Trying to register a new plugin with name: %s endpoint: %s versions: %s", + pluginName, endpoint, strings.Join(versions, ","))) - glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s", - pluginName, endpoint, strings.Join(versions, ","), socketPath)) + return nil +} - if endpoint == "" { - endpoint = socketPath - } +// RegisterPlugin is called when a plugin can be registered +func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error { + glog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint)) // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key // all other CSI components will be able to get the actual socket of CSI drivers by its name. csiDrivers.Lock() defer csiDrivers.Unlock() + csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint} // Get node info from the driver. @@ -118,22 +130,27 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string, // TODO (verult) retry with exponential backoff, possibly added in csi client library. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() + driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx) if err != nil { - return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) + return fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) } // Calling nodeLabelManager to update annotations and labels for newly registered CSI driver err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode) if err != nil { // Unregister the driver and return error - csiDrivers.Lock() - defer csiDrivers.Unlock() delete(csiDrivers.driversMap, pluginName) - return nil, err + return fmt.Errorf("error while adding CSI labels: %v", err) } - return nil, nil + return nil +} + +// DeRegisterPlugin is called when a plugin removed it's socket, signaling +// it is no longer available +// TODO: Handle DeRegistration +func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { } func (p *csiPlugin) Init(host volume.VolumeHost) error {