Adding CSI driver registration

pull/8/head
Serguei Bezverkhi 2018-06-04 16:47:24 -04:00
parent 0757e05e78
commit 1c05ca5575
6 changed files with 70 additions and 2 deletions

View File

@ -382,7 +382,9 @@ pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/cephfs
pkg/volume/configmap
pkg/volume/csi
pkg/volume/csi/fake
pkg/volume/csi/labelmanager
pkg/volume/empty_dir
pkg/volume/fc
pkg/volume/flexvolume

View File

@ -102,6 +102,7 @@ go_library(
"//pkg/util/removeall:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",

View File

@ -106,6 +106,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
utilexec "k8s.io/utils/exec"
)
@ -1290,6 +1291,9 @@ func (kl *Kubelet) initializeModules() error {
}
}
if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback)
// Start the plugin watcher
if err := kl.pluginWatcher.Start(); err != nil {
return fmt.Errorf("failed to start Plugin Watcher. err: %v", err)

View File

@ -12,9 +12,11 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume/csi",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/labelmanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
@ -25,6 +27,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
@ -70,6 +73,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/labelmanager:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -27,6 +27,8 @@ import (
"github.com/golang/glog"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
type csiClient interface {
@ -255,9 +257,16 @@ func newGrpcConn(driverName string) (*grpc.ClientConn, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
}
network := "unix"
addr := fmt.Sprintf(csiAddrTemplate, driverName)
// TODO once KubeletPluginsWatcher graduates to beta, remove FeatureGate check
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
driver, ok := csiDrivers.driversMap[driverName]
if !ok {
return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
}
addr = driver.driverEndpoint
}
network := "unix"
glog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
return grpc.Dial(

View File

@ -21,6 +21,8 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"
"github.com/golang/glog"
@ -29,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
)
const (
@ -59,9 +62,54 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
// volume.VolumePlugin methods
var _ volume.VolumePlugin = &csiPlugin{}
type csiDriver struct {
driverName string
driverEndpoint string
}
type csiDriversStore struct {
driversMap map[string]csiDriver
sync.RWMutex
}
// csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets
var csiDrivers csiDriversStore
var lm labelmanager.Interface
// RegistrationCallback is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) {
glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s",
pluginName, endpoint, strings.Join(versions, ","), socketPath))
if endpoint == "" {
endpoint = socketPath
}
// Calling nodeLabelManager to update label for newly registered CSI driver
err := lm.AddLabels(pluginName)
if err != nil {
return err, nil
}
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
return nil, nil
}
func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())
return nil
}