Updated the CSI pluginwatcher handler

pull/8/head
Renaud Gaubert 2018-08-12 00:21:25 +02:00
parent 29d225e90c
commit 78b55eb5bf
2 changed files with 31 additions and 14 deletions

View File

@ -1194,7 +1194,7 @@ type Kubelet struct {
// pluginwatcher is a utility for Kubelet to register different types of node-level plugins // pluginwatcher is a utility for Kubelet to register different types of node-level plugins
// such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the // such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the
// directory returned by kubelet.getPluginsDir() // directory returned by kubelet.getPluginsDir()
pluginWatcher pluginwatcher.Watcher pluginWatcher *pluginwatcher.Watcher
// This flag sets a maximum number of images to report in the node status. // This flag sets a maximum number of images to report in the node status.
nodeStatusMaxImages int32 nodeStatusMaxImages int32
@ -1365,7 +1365,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
kl.containerLogManager.Start() kl.containerLogManager.Start()
if kl.enablePluginsWatcher { if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver // Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) kl.pluginWatcher.AddHandler("CSIPlugin", pluginwatcher.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for Device Manager // Adding Registration Callback function for Device Manager
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback()) kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback())
// Start the plugin watcher // Start the plugin watcher

View File

@ -28,6 +28,7 @@ import (
"context" "context"
"github.com/golang/glog" "github.com/golang/glog"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -89,6 +90,10 @@ type csiDriversStore struct {
sync.RWMutex sync.RWMutex
} }
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
}
// TODO (verult) consider using a struct instead of global variables // TODO (verult) consider using a struct instead of global variables
// csiDrivers map keep track of all registered CSI drivers on the node and their // csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets // corresponding sockets
@ -96,21 +101,28 @@ var csiDrivers csiDriversStore
var nodeUpdater nodeupdater.Interface var nodeUpdater nodeupdater.Interface
// RegistrationCallback is called by kubelet's plugin watcher upon detection // PluginHandler is the plugin registration handler interface passed to the
// pluginwatcher module in kubelet
var PluginHandler = &RegistrationHandler{}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car. // of a new registration socket opened by CSI Driver registrar side car.
func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (chan bool, error) { func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
glog.Infof(log("Trying to register a new plugin with name: %s endpoint: %s versions: %s",
pluginName, endpoint, strings.Join(versions, ",")))
glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s", return nil
pluginName, endpoint, strings.Join(versions, ","), socketPath)) }
if endpoint == "" { // RegisterPlugin is called when a plugin can be registered
endpoint = socketPath func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error {
} glog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name. // all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers.Lock() csiDrivers.Lock()
defer csiDrivers.Unlock() defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint} csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
// Get node info from the driver. // Get node info from the driver.
@ -118,22 +130,27 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string,
// TODO (verult) retry with exponential backoff, possibly added in csi client library. // TODO (verult) retry with exponential backoff, possibly added in csi client library.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx) driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) return fmt.Errorf("error during CSI NodeGetInfo() call: %v", err)
} }
// Calling nodeLabelManager to update annotations and labels for newly registered CSI driver // Calling nodeLabelManager to update annotations and labels for newly registered CSI driver
err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode) err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode)
if err != nil { if err != nil {
// Unregister the driver and return error // Unregister the driver and return error
csiDrivers.Lock()
defer csiDrivers.Unlock()
delete(csiDrivers.driversMap, pluginName) delete(csiDrivers.driversMap, pluginName)
return nil, err return fmt.Errorf("error while adding CSI labels: %v", err)
} }
return nil, nil return nil
}
// DeRegisterPlugin is called when a plugin removed it's socket, signaling
// it is no longer available
// TODO: Handle DeRegistration
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
} }
func (p *csiPlugin) Init(host volume.VolumeHost) error { func (p *csiPlugin) Init(host volume.VolumeHost) error {