Merge pull request #67684 from verult/top-csi-driver-registration

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

CSI Node info registration in kubelet

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #67683

**Special notes for your reviewer**:
Feature issue: https://github.com/kubernetes/features/issues/557
Design doc: https://github.com/kubernetes/community/pull/2034

Missing pieces:
* CSI client retry and exponential backoff logic.
* CSINodeInfo object validation
* e2e test with all the CSI machinery.

An RBAC rule is also added to support external-provisioner topology updates.

**Release note**:

```release-note
Registers volume topology information reported by a node-level Container Storage Interface (CSI) driver. This enables Kubernetes support of CSI topology mechanisms.
```
pull/8/head
Kubernetes Submit Queue 2018-09-08 00:16:52 -07:00 committed by GitHub
commit f26556cc14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1643 additions and 300 deletions

View File

@ -407,7 +407,6 @@ pkg/volume/azure_file
pkg/volume/cephfs pkg/volume/cephfs
pkg/volume/configmap pkg/volume/configmap
pkg/volume/csi/fake pkg/volume/csi/fake
pkg/volume/csi/nodeupdater
pkg/volume/empty_dir pkg/volume/empty_dir
pkg/volume/fc pkg/volume/fc
pkg/volume/flexvolume pkg/volume/flexvolume

View File

@ -148,8 +148,11 @@ func NewAttachDetachController(
} }
// Install required CSI CRDs on API server // Install required CSI CRDs on API server
if utilfeature.DefaultFeatureGate.Enabled(features.CSICRDAutoInstall) { if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
adc.installCRDs() adc.installCSIDriverCRD()
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
adc.installCSINodeInfoCRD()
} }
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
@ -667,8 +670,7 @@ func (adc *attachDetachController) processVolumesInUse(
} }
} }
// installCRDs creates the specified CustomResourceDefinition for the CSIDrivers object. func (adc *attachDetachController) installCSIDriverCRD() error {
func (adc *attachDetachController) installCRDs() error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{ crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: csiapiv1alpha1.CsiDriverResourcePlural + "." + csiapiv1alpha1.GroupName, Name: csiapiv1alpha1.CsiDriverResourcePlural + "." + csiapiv1alpha1.GroupName,
@ -697,7 +699,12 @@ func (adc *attachDetachController) installCRDs() error {
return err return err
} }
crd = &apiextensionsv1beta1.CustomResourceDefinition{ return nil
}
// installCRDs creates the specified CustomResourceDefinition for the CSIDrivers object.
func (adc *attachDetachController) installCSINodeInfoCRD() error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: csiapiv1alpha1.CsiNodeInfoResourcePlural + "." + csiapiv1alpha1.GroupName, Name: csiapiv1alpha1.CsiNodeInfoResourcePlural + "." + csiapiv1alpha1.GroupName,
}, },
@ -711,7 +718,7 @@ func (adc *attachDetachController) installCRDs() error {
}, },
}, },
} }
res, err = adc.crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) res, err := adc.crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err == nil { if err == nil {
glog.Infof("CSINodeInfo CRD created successfully: %#v", glog.Infof("CSINodeInfo CRD created successfully: %#v",

View File

@ -207,8 +207,13 @@ const (
// owner: @saad-ali // owner: @saad-ali
// alpha: v1.12 // alpha: v1.12
// Enable automatic installation of CRD for csi.storage.k8s.io API objects. // Enable all logic related to the CSIDriver API object in csi.storage.k8s.io
CSICRDAutoInstall utilfeature.Feature = "CSICRDAutoInstall" CSIDriverRegistry utilfeature.Feature = "CSIDriverRegistry"
// owner: @verult
// alpha: v1.12
// Enable all logic related to the CSINodeInfo API object in csi.storage.k8s.io
CSINodeInfo utilfeature.Feature = "CSINodeInfo"
// owner @MrHohn // owner @MrHohn
// beta: v1.10 // beta: v1.10
@ -434,7 +439,8 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
MountContainers: {Default: false, PreRelease: utilfeature.Alpha}, MountContainers: {Default: false, PreRelease: utilfeature.Alpha},
VolumeScheduling: {Default: true, PreRelease: utilfeature.Beta}, VolumeScheduling: {Default: true, PreRelease: utilfeature.Beta},
CSIPersistentVolume: {Default: true, PreRelease: utilfeature.Beta}, CSIPersistentVolume: {Default: true, PreRelease: utilfeature.Beta},
CSICRDAutoInstall: {Default: false, PreRelease: utilfeature.Alpha}, CSIDriverRegistry: {Default: false, PreRelease: utilfeature.Alpha},
CSINodeInfo: {Default: false, PreRelease: utilfeature.Alpha},
CustomPodDNS: {Default: true, PreRelease: utilfeature.Beta}, CustomPodDNS: {Default: true, PreRelease: utilfeature.Beta},
BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, BlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA}, StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA},

View File

@ -16,7 +16,7 @@ go_library(
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/util/strings:go_default_library", "//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//pkg/volume/csi/nodeupdater:go_default_library", "//pkg/volume/csi/nodeinfomanager:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
@ -85,7 +85,7 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//pkg/volume/csi/fake:all-srcs", "//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/nodeupdater:all-srcs", "//pkg/volume/csi/nodeinfomanager:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],

View File

@ -41,7 +41,7 @@ import (
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1" csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/nodeupdater" "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
) )
const ( const (
@ -99,7 +99,7 @@ type RegistrationHandler struct {
// corresponding sockets // corresponding sockets
var csiDrivers csiDriversStore var csiDrivers csiDriversStore
var nodeUpdater nodeupdater.Interface var nim nodeinfomanager.Interface
// PluginHandler is the plugin registration handler interface passed to the // PluginHandler is the plugin registration handler interface passed to the
// pluginwatcher module in kubelet // pluginwatcher module in kubelet
@ -118,12 +118,17 @@ func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string,
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error { func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error {
glog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint)) glog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
func() {
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name. // all other CSI components will be able to get the actual socket of CSI drivers by its name.
// It's not necessary to lock the entire RegistrationCallback() function because only the CSI
// client depends on this driver map, and the CSI client does not depend on node information
// updated in the rest of the function.
csiDrivers.Lock() csiDrivers.Lock()
defer csiDrivers.Unlock() defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint} csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
}()
// Get node info from the driver. // Get node info from the driver.
csi := newCsiDriverClient(pluginName) csi := newCsiDriverClient(pluginName)
@ -131,17 +136,16 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string)
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx) driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
if err != nil { if err != nil {
unregisterDriver(pluginName)
return fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) return fmt.Errorf("error during CSI NodeGetInfo() call: %v", err)
} }
// Calling nodeLabelManager to update annotations and labels for newly registered CSI driver err = nim.AddNodeInfo(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode)
if err != nil { if err != nil {
// Unregister the driver and return error unregisterDriver(pluginName)
delete(csiDrivers.driversMap, pluginName) return fmt.Errorf("error updating CSI node info in the cluster: %v", err)
return fmt.Errorf("error while adding CSI labels: %v", err)
} }
return nil return nil
@ -154,12 +158,11 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
} }
func (p *csiPlugin) Init(host volume.VolumeHost) error { func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host p.host = host
// Initializing csiDrivers map and label management channels // Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
nodeUpdater = nodeupdater.NewNodeUpdater(host.GetNodeName(), host.GetKubeClient()) nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host.GetKubeClient(), host.GetCSIClient())
csiClient := host.GetCSIClient() csiClient := host.GetCSIClient()
if csiClient != nil { if csiClient != nil {
@ -554,3 +557,15 @@ func (p *csiPlugin) getPublishVolumeInfo(client clientset.Interface, handle, dri
} }
return attachment.Status.AttachmentMetadata, nil return attachment.Status.AttachmentMetadata, nil
} }
func unregisterDriver(driverName string) {
func() {
csiDrivers.Lock()
defer csiDrivers.Unlock()
delete(csiDrivers.driversMap, driverName)
}()
if err := nim.RemoveNodeInfo(driverName); err != nil {
glog.Errorf("Error unregistering CSI driver: %v", err)
}
}

View File

@ -0,0 +1,59 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["nodeinfomanager.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["nodeinfomanager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
],
)

View File

@ -0,0 +1,512 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeinfomanager includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
import (
"encoding/json"
"fmt"
csipb "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
// Name of node annotation that contains JSON map of driver names to node
annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
)
var nodeKind = v1.SchemeGroupVersion.WithKind("Node")
// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
type nodeInfoManager struct {
nodeName types.NodeName
k8s kubernetes.Interface
csiKubeClient csiclientset.Interface
}
// If no updates is needed, the function must return the same Node object as the input.
type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
// Interface implements an interface for managing labels of a node
type Interface interface {
// Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to AddNodeInfo() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
AddNodeInfo(driverName string, driverNodeID string, maxVolumeLimit int64, topology *csipb.Topology) error
// Remove in the cluster node information from the CSI driver with the given name.
// Concurrent calls to RemoveNodeInfo() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
RemoveNodeInfo(driverName string) error
}
// NewNodeInfoManager initializes nodeInfoManager
func NewNodeInfoManager(
nodeName types.NodeName,
kubeClient kubernetes.Interface,
csiKubeClient csiclientset.Interface) Interface {
return &nodeInfoManager{
nodeName: nodeName,
k8s: kubeClient,
csiKubeClient: csiKubeClient,
}
}
// AddNodeInfo updates the node ID annotation in the Node object and CSIDrivers field in the
// CSINodeInfo object. If the CSINodeInfo object doesn't yet exist, it will be created.
// If multiple calls to AddNodeInfo() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) AddNodeInfo(driverName string, driverNodeID string, maxAttachLimit int64, topology *csipb.Topology) error {
if driverNodeID == "" {
return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
}
nodeUpdateFuncs := []nodeUpdateFunc{
updateNodeIDInNode(driverName, driverNodeID),
updateMaxAttachLimit(driverName, maxAttachLimit),
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
nodeUpdateFuncs = append(nodeUpdateFuncs, updateTopologyLabels(topology))
}
err := nim.updateNode(nodeUpdateFuncs...)
if err != nil {
return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
err = nim.updateCSINodeInfo(driverName, driverNodeID, topology)
if err != nil {
return fmt.Errorf("error updating CSINodeInfo object with CSI driver node info: %v", err)
}
}
return nil
}
// RemoveNodeInfo removes the node ID annotation from the Node object and CSIDrivers field from the
// CSINodeInfo object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
// If multiple calls to RemoveNodeInfo() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) RemoveNodeInfo(driverName string) error {
err := nim.removeCSINodeInfo(driverName)
if err != nil {
return fmt.Errorf("error removing CSI driver node info from CSINodeInfo object %v", err)
}
err = nim.updateNode(
removeMaxAttachLimit(driverName),
removeNodeIDFromNode(driverName),
)
if err != nil {
return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
}
return nil
}
// updateNode repeatedly attempts to update the corresponding node object
// which is modified by applying the given update functions sequentially.
// Because updateFuncs are applied sequentially, later updateFuncs should take into account
// the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
// functions update the same field, updates in the last function are persisted.
func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
nodeClient := nim.k8s.CoreV1().Nodes()
node, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
needUpdate := false
for _, update := range updateFuncs {
newNode, updated, err := update(node)
if err != nil {
return err
}
node = newNode
needUpdate = needUpdate || updated
}
if needUpdate {
_, updateErr := nodeClient.Update(node)
return updateErr // do not wrap error
}
return nil
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}
// Guarantees the map is non-nil if no error is returned.
func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKeyNodeID]
}
var existingDriverMap map[string]string
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return nil, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKeyNodeID,
previousAnnotationValue,
err)
}
}
if existingDriverMap == nil {
return make(map[string]string), nil
}
return existingDriverMap, nil
}
// updateNodeIDInNode returns a function that updates a Node object with the given
// Node ID information.
func updateNodeIDInNode(
csiDriverName string,
csiDriverNodeID string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
if err != nil {
return nil, false, err
}
if val, ok := existingDriverMap[csiDriverName]; ok {
if val == csiDriverNodeID {
// Value already exists in node annotation, nothing more to do
return node, false, nil
}
}
// Add/update annotation value
existingDriverMap[csiDriverName] = csiDriverNodeID
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return nil, false, fmt.Errorf(
"error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
csiDriverName,
csiDriverNodeID,
err)
}
if node.ObjectMeta.Annotations == nil {
node.ObjectMeta.Annotations = make(map[string]string)
}
node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
return node, true, nil
}
}
// removeNodeIDFromNode returns a function that removes node ID information matching the given
// driver name from a Node object.
func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKeyNodeID]
}
if previousAnnotationValue == "" {
return node, false, nil
}
// Parse previousAnnotationValue as JSON
existingDriverMap := map[string]string{}
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return nil, false, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKeyNodeID,
previousAnnotationValue,
err)
}
if _, ok := existingDriverMap[csiDriverName]; !ok {
// Value is already missing in node annotation, nothing more to do
return node, false, nil
}
// Delete annotation value
delete(existingDriverMap, csiDriverName)
if len(existingDriverMap) == 0 {
delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
} else {
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return nil, false, fmt.Errorf(
"failed while trying to remove key %q from node %q annotation. Existing data: %v",
csiDriverName,
annotationKeyNodeID,
previousAnnotationValue)
}
node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
}
return node, true, nil
}
}
// updateTopologyLabels returns a function that updates labels of a Node object with the given
// topology information.
func updateTopologyLabels(topology *csipb.Topology) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
if topology == nil || len(topology.Segments) == 0 {
return node, false, nil
}
for k, v := range topology.Segments {
if curVal, exists := node.Labels[k]; exists && curVal != v {
return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
}
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
for k, v := range topology.Segments {
node.Labels[k] = v
}
return node, true, nil
}
}
func (nim *nodeInfoManager) updateCSINodeInfo(
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
return nim.createNodeInfoObject(driverName, driverNodeID, topology)
}
if err != nil {
return err // do not wrap error
}
return nim.updateNodeInfoObject(nodeInfo, driverName, driverNodeID, topology)
})
if retryErr != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr)
}
return nil
}
func (nim *nodeInfoManager) createNodeInfoObject(
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
var topologyKeys []string
if topology != nil {
for k := range topology.Segments {
topologyKeys = append(topologyKeys, k)
}
}
node, err := nim.k8s.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
nodeInfo := &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: nodeKind.Version,
Kind: nodeKind.Kind,
Name: node.Name,
UID: node.UID,
},
},
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
},
},
}
_, err = nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
return err // do not wrap error
}
func (nim *nodeInfoManager) updateNodeInfoObject(
nodeInfo *csiv1alpha1.CSINodeInfo,
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
topologyKeys := make(sets.String)
if topology != nil {
for k := range topology.Segments {
topologyKeys.Insert(k)
}
}
// Clone driver list, omitting the driver that matches the given driverName,
// unless the driver is identical to information provided, in which case no update is necessary.
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver == driverName {
prevTopologyKeys := sets.NewString(driverInfo.TopologyKeys...)
if driverInfo.NodeID == driverNodeID && prevTopologyKeys.Equal(topologyKeys) {
// No update needed
return nil
}
} else {
// Omit driverInfo matching given driverName
newDriverInfos = append(newDriverInfos, driverInfo)
}
}
// Append new driver
driverInfo := csiv1alpha1.CSIDriverInfo{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
}
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
_, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
return err // do not wrap error
}
func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfoClient := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// do nothing
return nil
}
if err != nil {
return err // do not wrap error
}
// Remove matching driver from driver list
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver != csiDriverName {
newDriverInfos = append(newDriverInfos, driverInfo)
}
}
if len(newDriverInfos) == len(nodeInfo.CSIDrivers) {
// No changes, don't update
return nil
}
if len(newDriverInfos) == 0 {
// No drivers left, delete CSINodeInfo object
return nodeInfoClient.Delete(string(nim.nodeName), &metav1.DeleteOptions{})
}
// TODO (verult) make sure CSINodeInfo has validation logic to prevent duplicate driver names
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr)
}
return nil
}
func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
if maxLimit <= 0 {
glog.V(4).Infof("skipping adding attach limit for %s", driverName)
return node, false, nil
}
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
limitKeyName := util.GetCSIAttachLimitKey(driverName)
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
return node, true, nil
}
}
func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
capacityExists := false
if node.Status.Capacity != nil {
_, capacityExists = node.Status.Capacity[limitKey]
}
allocatableExists := false
if node.Status.Allocatable != nil {
_, allocatableExists = node.Status.Allocatable[limitKey]
}
if !capacityExists && !allocatableExists {
return node, false, nil
}
delete(node.Status.Capacity, limitKey)
if len(node.Status.Capacity) == 0 {
node.Status.Capacity = nil
}
delete(node.Status.Allocatable, limitKey)
if len(node.Status.Allocatable) == 0 {
node.Status.Allocatable = nil
}
return node, true, nil
}
}

View File

@ -0,0 +1,699 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeinfomanager
import (
"encoding/json"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
"testing"
)
type testcase struct {
name string
driverName string
existingNode *v1.Node
existingNodeInfo *csiv1alpha1.CSINodeInfo
inputNodeID string
inputTopology *csi.Topology
expectedNodeIDMap map[string]string
expectedTopologyMap map[string]sets.String
expectedLabels map[string]string
expectNoNodeInfo bool
expectFail bool
}
type nodeIDMap map[string]string
type topologyKeyMap map[string][]string
type labelMap map[string]string
// TestAddNodeInfo tests AddNodeInfo with various existing Node and/or CSINodeInfo objects.
// The node IDs in all test cases below are the same between the Node annotation and CSINodeInfo.
func TestAddNodeInfo(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{"com.example.csi/zone": "zoneA"},
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
{
name: "pre-existing node info from the same driver, but without topology info",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil, /* topologyKeys */
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
labelMap{
"net.example.storage/rack": "rack1",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/rack"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
"net.example.storage/other-driver": sets.NewString("net.example.storage/rack"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
"net.example.storage/rack": "rack1",
},
},
{
name: "pre-existing node info from the same driver, but different node ID and topology values; labels should conflict",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "other-zone",
},
},
expectFail: true,
},
{
name: "pre-existing node info from the same driver, but different node ID and topology keys; new labels should be added",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/other-node",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/rack": "rack1",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/other-node",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/rack"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
"com.example.csi/rack": "rack1",
},
},
{
name: "nil topology, empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: nil,
},
{
name: "nil topology, pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA", // old labels are not removed
},
},
{
name: "nil topology, pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
labelMap{
"net.example.storage/rack": "rack1",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/rack"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
expectedTopologyMap: map[string]sets.String{
"net.example.storage/other-driver": sets.NewString("net.example.storage/rack"),
"com.example.csi/driver1": nil,
},
expectedLabels: map[string]string{
"net.example.storage/rack": "rack1",
},
},
{
name: "empty node ID",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
inputNodeID: "",
expectFail: true,
},
}
test(t, true /* addNodeInfo */, true /* csiNodeInfoEnabled */, testcases)
}
// TestAddNodeInfo_CSINodeInfoDisabled tests AddNodeInfo with various existing Node annotations
// and CSINodeInfo feature gate disabled.
func TestAddNodeInfo_CSINodeInfoDisabled(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
nil /* labels */),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
},
}
test(t, true /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases)
}
// TestRemoveNodeInfo tests RemoveNodeInfo with various existing Node and/or CSINodeInfo objects.
func TestRemoveNodeInfo(t *testing.T) {
testcases := []testcase{
{
name: "empty node and no CSINodeInfo",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
expectedNodeIDMap: nil,
expectedLabels: nil,
expectNoNodeInfo: true,
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
expectedNodeIDMap: nil,
expectedLabels: map[string]string{"com.example.csi/zone": "zoneA"},
expectNoNodeInfo: true,
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
labelMap{
"net.example.storage/zone": "zoneA",
}),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/zone"},
},
),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"net.example.storage/other-driver": sets.NewString("net.example.storage/zone"),
},
expectedLabels: map[string]string{"net.example.storage/zone": "zoneA"},
},
{
name: "pre-existing info about the same driver in node, but no CSINodeInfo",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */),
expectedNodeIDMap: nil,
expectedLabels: nil,
expectNoNodeInfo: true,
},
{
name: "pre-existing info about a different driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
nil /* labels */),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
expectedLabels: nil,
expectNoNodeInfo: true,
},
}
test(t, false /* addNodeInfo */, true /* csiNodeInfoEnabled */, testcases)
}
// TestRemoveNodeInfo tests RemoveNodeInfo with various existing Node objects and CSINodeInfo
// feature disabled.
func TestRemoveNodeInfo_CSINodeInfoDisabled(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */),
expectedNodeIDMap: nil,
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */),
expectedNodeIDMap: nil,
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
nil /* labels */),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
},
}
test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases)
}
func TestAddNodeInfoExistingAnnotation(t *testing.T) {
csiNodeInfoEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo)
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.CSINodeInfo))
defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled))
driverName := "com.example.csi/driver1"
nodeID := "com.example.csi/some-node"
testcases := []struct {
name string
existingNode *v1.Node
}{
{
name: "pre-existing info about the same driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */),
},
{
name: "pre-existing info about a different driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
nil /* labels */),
},
}
for _, tc := range testcases {
t.Logf("test case: %q", tc.name)
// Arrange
nodeName := tc.existingNode.Name
client := fake.NewSimpleClientset(tc.existingNode)
csiClient := csifake.NewSimpleClientset()
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
// Act
err := nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
if err != nil {
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
continue
}
// Assert
nodeInfo, err := csiClient.Csi().CSINodeInfos().Get(nodeName, metav1.GetOptions{})
if err != nil {
t.Errorf("error getting CSINodeInfo: %v", err)
continue
}
if len(nodeInfo.CSIDrivers) != 1 {
t.Errorf("expected 1 CSIDriverInfo entry but got: %d", len(nodeInfo.CSIDrivers))
continue
}
driver := nodeInfo.CSIDrivers[0]
if driver.Driver != driverName || driver.NodeID != nodeID {
t.Errorf("expected Driver to be %q and NodeID to be %q, but got: %q:%q", driverName, nodeID, driver.Driver, driver.NodeID)
}
}
}
func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []testcase) {
wasEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo)
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled))
defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, wasEnabled))
for _, tc := range testcases {
t.Logf("test case: %q", tc.name)
//// Arrange
nodeName := tc.existingNode.Name
client := fake.NewSimpleClientset(tc.existingNode)
var csiClient *csifake.Clientset
if tc.existingNodeInfo == nil {
csiClient = csifake.NewSimpleClientset()
} else {
csiClient = csifake.NewSimpleClientset(tc.existingNodeInfo)
}
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
//// Act
var err error
if addNodeInfo {
err = nim.AddNodeInfo(tc.driverName, tc.inputNodeID, 0 /* maxVolumeLimit */, tc.inputTopology) // TODO test maxVolumeLimit
} else {
err = nim.RemoveNodeInfo(tc.driverName)
}
//// Assert
if tc.expectFail {
if err == nil {
t.Errorf("expected an error from AddNodeInfo call but got none")
}
continue
} else if err != nil {
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
continue
}
/* Node Validation */
node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
t.Errorf("error getting node: %v", err)
continue
}
// Node ID annotation
annNodeID, ok := node.Annotations[annotationKeyNodeID]
if ok {
if tc.expectedNodeIDMap == nil {
t.Errorf("expected annotation %q to not exist, but got: %q", annotationKeyNodeID, annNodeID)
} else {
var actualNodeIDs map[string]string
err = json.Unmarshal([]byte(annNodeID), &actualNodeIDs)
if err != nil {
t.Errorf("expected no error when parsing annotation %q, but got error: %v", annotationKeyNodeID, err)
}
if !helper.Semantic.DeepEqual(actualNodeIDs, tc.expectedNodeIDMap) {
t.Errorf("expected annotation %v; got: %v", tc.expectedNodeIDMap, actualNodeIDs)
}
}
} else {
if tc.expectedNodeIDMap != nil {
t.Errorf("expected annotation %q, but got none", annotationKeyNodeID)
}
}
if csiNodeInfoEnabled {
// Topology labels
if !helper.Semantic.DeepEqual(node.Labels, tc.expectedLabels) {
t.Errorf("expected topology labels to be %v; got: %v", tc.expectedLabels, node.Labels)
}
/* CSINodeInfo validation */
nodeInfo, err := csiClient.Csi().CSINodeInfos().Get(nodeName, metav1.GetOptions{})
if tc.expectNoNodeInfo && errors.IsNotFound(err) {
continue
} else if err != nil {
t.Errorf("error getting CSINodeInfo: %v", err)
continue
}
// Extract node IDs and topology keys
actualNodeIDs := make(map[string]string)
actualTopologyKeys := make(map[string]sets.String)
for _, driver := range nodeInfo.CSIDrivers {
actualNodeIDs[driver.Driver] = driver.NodeID
actualTopologyKeys[driver.Driver] = sets.NewString(driver.TopologyKeys...)
}
// Node IDs
if !helper.Semantic.DeepEqual(actualNodeIDs, tc.expectedNodeIDMap) {
t.Errorf("expected node IDs %v from CSINodeInfo; got: %v", tc.expectedNodeIDMap, actualNodeIDs)
}
// Topology keys
if !helper.Semantic.DeepEqual(actualTopologyKeys, tc.expectedTopologyMap) {
t.Errorf("expected topology keys %v from CSINodeInfo; got: %v", tc.expectedTopologyMap, actualTopologyKeys)
}
}
}
}
func generateNode(nodeIDs, labels map[string]string) *v1.Node {
var annotations map[string]string
if len(nodeIDs) > 0 {
b, _ := json.Marshal(nodeIDs)
annotations = map[string]string{annotationKeyNodeID: string(b)}
}
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: annotations,
Labels: labels,
},
}
}
func generateNodeInfo(nodeIDs map[string]string, topologyKeys map[string][]string) *csiv1alpha1.CSINodeInfo {
var drivers []csiv1alpha1.CSIDriverInfo
for k, nodeID := range nodeIDs {
d := csiv1alpha1.CSIDriverInfo{
Driver: k,
NodeID: nodeID,
}
if top, exists := topologyKeys[k]; exists {
d.TopologyKeys = top
}
drivers = append(drivers, d)
}
return &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
CSIDrivers: drivers,
}
}

View File

@ -1,33 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["nodeupdater.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeupdater",
visibility = ["//visibility:public"],
deps = [
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,193 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeupdater includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package nodeupdater // import "k8s.io/kubernetes/pkg/volume/csi/nodeupdater"
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
// Name of node annotation that contains JSON map of driver names to node
// names
annotationKey = "csi.volume.kubernetes.io/nodeid"
)
// labelManagementStruct is struct of channels used for communication between the driver registration
// code and the go routine responsible for managing the node's labels
type nodeUpdateStruct struct {
nodeName types.NodeName
k8s kubernetes.Interface
}
// Interface implements an interface for managing labels of a node
type Interface interface {
AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error
}
// NewNodeupdater initializes nodeUpdateStruct and returns available interfaces
func NewNodeUpdater(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
return nodeUpdateStruct{
nodeName: nodeName,
k8s: kubeClient,
}
}
// AddLabelsAndLimits nodeUpdater waits for labeling requests initiated by the driver's registration
// process and updates labels and attach limits
func (nodeUpdater nodeUpdateStruct) AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error {
err := addLabelsAndLimits(string(nodeUpdater.nodeName), nodeUpdater.k8s.CoreV1().Nodes(), driverName, driverNodeId, maxLimit)
if err != nil {
return err
}
return nil
}
func addMaxAttachLimitToNode(node *v1.Node, driverName string, maxLimit int64) *v1.Node {
if maxLimit <= 0 {
glog.V(4).Infof("skipping adding attach limit for %s", driverName)
return node
}
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
limitKeyName := util.GetCSIAttachLimitKey(driverName)
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
return node
}
// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if annotationKey is empty.
func cloneAndAddAnnotation(
annotations map[string]string,
annotationKey,
annotationValue string) map[string]string {
if annotationKey == "" {
// Don't need to add an annotation.
return annotations
}
// Clone.
newAnnotations := map[string]string{}
for key, value := range annotations {
newAnnotations[key] = value
}
newAnnotations[annotationKey] = annotationValue
return newAnnotations
}
func addNodeIdToNode(node *v1.Node, driverName string, csiDriverNodeId string) (*v1.Node, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return node, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
}
if val, ok := existingDriverMap[driverName]; ok {
if val == csiDriverNodeId {
// Value already exists in node annotation, nothing more to do
glog.V(2).Infof(
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
driverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
return node, nil
}
}
// Add/update annotation value
existingDriverMap[driverName] = csiDriverNodeId
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return node, fmt.Errorf(
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
driverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
}
node.ObjectMeta.Annotations = cloneAndAddAnnotation(
node.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
return node, nil
}
func addLabelsAndLimits(nodeName string, nodeClient corev1.NodeInterface, driverName string, csiDriverNodeId string, maxLimit int64) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
node, getErr := nodeClient.Get(nodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var labelErr error
node, labelErr = addNodeIdToNode(node, driverName, csiDriverNodeId)
if labelErr != nil {
return labelErr
}
node = addMaxAttachLimitToNode(node, driverName, maxLimit)
_, updateErr := nodeClient.Update(node)
if updateErr == nil {
glog.V(2).Infof(
"Updated node %q successfully for CSI driver %q and CSI node name %q",
nodeName,
driverName,
csiDriverNodeId)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("error setting attach limit and labels for %s with : %v", driverName, retryErr)
}
return nil
}

View File

@ -20,12 +20,14 @@ go_library(
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
], ],
) )
@ -42,12 +44,14 @@ go_test(
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library",
], ],
) )

View File

@ -22,12 +22,14 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
apiserveradmission "k8s.io/apiserver/pkg/admission/initializer" apiserveradmission "k8s.io/apiserver/pkg/admission/initializer"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
corev1lister "k8s.io/client-go/listers/core/v1" corev1lister "k8s.io/client-go/listers/core/v1"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
podutil "k8s.io/kubernetes/pkg/api/pod" podutil "k8s.io/kubernetes/pkg/api/pod"
authenticationapi "k8s.io/kubernetes/pkg/apis/authentication" authenticationapi "k8s.io/kubernetes/pkg/apis/authentication"
coordapi "k8s.io/kubernetes/pkg/apis/coordination" coordapi "k8s.io/kubernetes/pkg/apis/coordination"
@ -92,6 +94,7 @@ var (
pvcResource = api.Resource("persistentvolumeclaims") pvcResource = api.Resource("persistentvolumeclaims")
svcacctResource = api.Resource("serviceaccounts") svcacctResource = api.Resource("serviceaccounts")
leaseResource = coordapi.Resource("leases") leaseResource = coordapi.Resource("leases")
csiNodeInfoResource = csiv1alpha1.Resource("csinodeinfos")
) )
func (c *nodePlugin) Admit(a admission.Attributes) error { func (c *nodePlugin) Admit(a admission.Attributes) error {
@ -143,6 +146,12 @@ func (c *nodePlugin) Admit(a admission.Attributes) error {
} }
return admission.NewForbidden(a, fmt.Errorf("disabled by feature gate %s", features.NodeLease)) return admission.NewForbidden(a, fmt.Errorf("disabled by feature gate %s", features.NodeLease))
case csiNodeInfoResource:
if c.features.Enabled(features.KubeletPluginsWatcher) && c.features.Enabled(features.CSINodeInfo) {
return c.admitCSINodeInfo(nodeName, a)
}
return admission.NewForbidden(a, fmt.Errorf("disabled by feature gates %s and %s", features.KubeletPluginsWatcher, features.CSINodeInfo))
default: default:
return nil return nil
} }
@ -422,3 +431,23 @@ func (r *nodePlugin) admitLease(nodeName string, a admission.Attributes) error {
return nil return nil
} }
func (c *nodePlugin) admitCSINodeInfo(nodeName string, a admission.Attributes) error {
// the request must come from a node with the same name as the CSINodeInfo object
if a.GetOperation() == admission.Create {
// a.GetName() won't return the name on create, so we drill down to the proposed object
accessor, err := meta.Accessor(a.GetObject())
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("unable to access the object name"))
}
if accessor.GetName() != nodeName {
return admission.NewForbidden(a, fmt.Errorf("can only access CSINodeInfo with the same name as the requesting node"))
}
} else {
if a.GetName() != nodeName {
return admission.NewForbidden(a, fmt.Errorf("can only access CSINodeInfo with the same name as the requesting node"))
}
}
return nil
}

View File

@ -21,14 +21,17 @@ import (
"testing" "testing"
"time" "time"
"fmt"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1lister "k8s.io/client-go/listers/core/v1" corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
authenticationapi "k8s.io/kubernetes/pkg/apis/authentication" authenticationapi "k8s.io/kubernetes/pkg/apis/authentication"
"k8s.io/kubernetes/pkg/apis/coordination" "k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -43,6 +46,8 @@ var (
trDisabledFeature = utilfeature.NewFeatureGate() trDisabledFeature = utilfeature.NewFeatureGate()
leaseEnabledFeature = utilfeature.NewFeatureGate() leaseEnabledFeature = utilfeature.NewFeatureGate()
leaseDisabledFeature = utilfeature.NewFeatureGate() leaseDisabledFeature = utilfeature.NewFeatureGate()
csiNodeInfoEnabledFeature = utilfeature.NewFeatureGate()
csiNodeInfoDisabledFeature = utilfeature.NewFeatureGate()
) )
func init() { func init() {
@ -58,6 +63,18 @@ func init() {
if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil { if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil {
panic(err) panic(err)
} }
if err := csiNodeInfoEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.KubeletPluginsWatcher: {Default: true}}); err != nil {
panic(err)
}
if err := csiNodeInfoEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.CSINodeInfo: {Default: true}}); err != nil {
panic(err)
}
if err := csiNodeInfoDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.KubeletPluginsWatcher: {Default: false}}); err != nil {
panic(err)
}
if err := csiNodeInfoDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.CSINodeInfo: {Default: false}}); err != nil {
panic(err)
}
} }
func makeTestPod(namespace, name, node string, mirror bool) (*api.Pod, *corev1.Pod) { func makeTestPod(namespace, name, node string, mirror bool) (*api.Pod, *corev1.Pod) {
@ -193,6 +210,33 @@ func Test_nodePlugin_Admit(t *testing.T) {
}, },
} }
csiNodeInfoResource = csiv1alpha1.Resource("csinodeinfos").WithVersion("v1alpha1")
csiNodeInfoKind = schema.GroupVersionKind{Group: "csi.storage.k8s.io", Version: "v1alpha1", Kind: "CSINodeInfo"}
nodeInfo = &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: "mynode",
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: "com.example.csi/mydriver",
NodeID: "com.example.csi/mynode",
TopologyKeys: []string{"com.example.csi/zone"},
},
},
}
nodeInfoWrongName = &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: "com.example.csi/mydriver",
NodeID: "com.example.csi/foo",
TopologyKeys: []string{"com.example.csi/zone"},
},
},
}
noExistingPodsIndex = cache.NewIndexer(cache.MetaNamespaceKeyFunc, nil) noExistingPodsIndex = cache.NewIndexer(cache.MetaNamespaceKeyFunc, nil)
noExistingPods = corev1lister.NewPodLister(noExistingPodsIndex) noExistingPods = corev1lister.NewPodLister(noExistingPodsIndex)
@ -955,6 +999,49 @@ func Test_nodePlugin_Admit(t *testing.T) {
features: leaseEnabledFeature, features: leaseEnabledFeature,
err: "", err: "",
}, },
// CSINodeInfo
{
name: "disallowed create CSINodeInfo - feature disabled",
attributes: admission.NewAttributesRecord(nodeInfo, nil, csiNodeInfoKind, nodeInfo.Namespace, nodeInfo.Name, csiNodeInfoResource, "", admission.Create, false, mynode),
features: csiNodeInfoDisabledFeature,
err: fmt.Sprintf("forbidden: disabled by feature gates %s and %s", features.KubeletPluginsWatcher, features.CSINodeInfo),
},
{
name: "disallowed create another node's CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nodeInfoWrongName, nil, csiNodeInfoKind, nodeInfoWrongName.Namespace, nodeInfoWrongName.Name, csiNodeInfoResource, "", admission.Create, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed update another node's CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nodeInfoWrongName, nodeInfoWrongName, csiNodeInfoKind, nodeInfoWrongName.Namespace, nodeInfoWrongName.Name, csiNodeInfoResource, "", admission.Update, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed delete another node's CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nil, nil, csiNodeInfoKind, nodeInfoWrongName.Namespace, nodeInfoWrongName.Name, csiNodeInfoResource, "", admission.Delete, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "forbidden: ",
},
{
name: "allowed create node CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nodeInfo, nil, csiNodeInfoKind, nodeInfo.Namespace, nodeInfo.Name, csiNodeInfoResource, "", admission.Create, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "",
},
{
name: "allowed update node CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nodeInfo, nodeInfo, csiNodeInfoKind, nodeInfo.Namespace, nodeInfo.Name, csiNodeInfoResource, "", admission.Update, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "",
},
{
name: "allowed delete node CSINodeInfo - feature enabled",
attributes: admission.NewAttributesRecord(nil, nil, csiNodeInfoKind, nodeInfo.Namespace, nodeInfo.Name, csiNodeInfoResource, "", admission.Delete, false, mynode),
features: csiNodeInfoEnabledFeature,
err: "",
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {

View File

@ -56,6 +56,7 @@ go_library(
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//third_party/forked/gonum/graph:go_default_library", "//third_party/forked/gonum/graph:go_default_library",
"//third_party/forked/gonum/graph/simple:go_default_library", "//third_party/forked/gonum/graph/simple:go_default_library",
"//third_party/forked/gonum/graph/traverse:go_default_library", "//third_party/forked/gonum/graph/traverse:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
coordapi "k8s.io/kubernetes/pkg/apis/coordination" coordapi "k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
storageapi "k8s.io/kubernetes/pkg/apis/storage" storageapi "k8s.io/kubernetes/pkg/apis/storage"
@ -74,6 +75,7 @@ var (
vaResource = storageapi.Resource("volumeattachments") vaResource = storageapi.Resource("volumeattachments")
svcAcctResource = api.Resource("serviceaccounts") svcAcctResource = api.Resource("serviceaccounts")
leaseResource = coordapi.Resource("leases") leaseResource = coordapi.Resource("leases")
csiNodeInfoResource = csiv1alpha1.Resource("csinodeinfos")
) )
func (r *NodeAuthorizer) Authorize(attrs authorizer.Attributes) (authorizer.Decision, string, error) { func (r *NodeAuthorizer) Authorize(attrs authorizer.Attributes) (authorizer.Decision, string, error) {
@ -120,7 +122,13 @@ func (r *NodeAuthorizer) Authorize(attrs authorizer.Attributes) (authorizer.Deci
return r.authorizeLease(nodeName, attrs) return r.authorizeLease(nodeName, attrs)
} }
return authorizer.DecisionNoOpinion, fmt.Sprintf("disabled by feature gate %s", features.NodeLease), nil return authorizer.DecisionNoOpinion, fmt.Sprintf("disabled by feature gate %s", features.NodeLease), nil
case csiNodeInfoResource:
if r.features.Enabled(features.KubeletPluginsWatcher) && r.features.Enabled(features.CSINodeInfo) {
return r.authorizeCSINodeInfo(nodeName, attrs)
} }
return authorizer.DecisionNoOpinion, fmt.Sprintf("disabled by feature gates %s and %s", features.KubeletPluginsWatcher, features.CSINodeInfo), nil
}
} }
// Access to other resources is not subdivided, so just evaluate against the statically defined node rules // Access to other resources is not subdivided, so just evaluate against the statically defined node rules
@ -252,6 +260,35 @@ func (r *NodeAuthorizer) authorizeLease(nodeName string, attrs authorizer.Attrib
return authorizer.DecisionAllow, "", nil return authorizer.DecisionAllow, "", nil
} }
// authorizeCSINodeInfo authorizes node requests to CSINodeInfo csi.storage.k8s.io/csinodeinfos
func (r *NodeAuthorizer) authorizeCSINodeInfo(nodeName string, attrs authorizer.Attributes) (authorizer.Decision, string, error) {
// allowed verbs: get, create, update, patch, delete
verb := attrs.GetVerb()
if verb != "get" &&
verb != "create" &&
verb != "update" &&
verb != "patch" &&
verb != "delete" {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, "can only get, create, update, patch, or delete a CSINodeInfo", nil
}
if len(attrs.GetSubresource()) > 0 {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, "cannot authorize CSINodeInfo subresources", nil
}
// the request must come from a node with the same name as the CSINodeInfo
// note we skip this check for create, since the authorizer doesn't know the name on create
// the noderestriction admission plugin is capable of performing this check at create time
if verb != "create" && attrs.GetName() != nodeName {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, "can only access CSINodeInfo with the same name as the requesting node", nil
}
return authorizer.DecisionAllow, "", nil
}
// hasPathFrom returns true if there is a directed path from the specified type/namespace/name to the specified Node // hasPathFrom returns true if there is a directed path from the specified type/namespace/name to the specified Node
func (r *NodeAuthorizer) hasPathFrom(nodeName string, startingType vertexType, startingNamespace, startingName string) (bool, error) { func (r *NodeAuthorizer) hasPathFrom(nodeName string, startingType vertexType, startingNamespace, startingName string) (bool, error) {
r.graph.lock.RLock() r.graph.lock.RLock()

View File

@ -45,6 +45,8 @@ var (
trDisabledFeature = utilfeature.NewFeatureGate() trDisabledFeature = utilfeature.NewFeatureGate()
leaseEnabledFeature = utilfeature.NewFeatureGate() leaseEnabledFeature = utilfeature.NewFeatureGate()
leaseDisabledFeature = utilfeature.NewFeatureGate() leaseDisabledFeature = utilfeature.NewFeatureGate()
csiNodeInfoEnabledFeature = utilfeature.NewFeatureGate()
csiNodeInfoDisabledFeature = utilfeature.NewFeatureGate()
) )
func init() { func init() {
@ -66,6 +68,18 @@ func init() {
if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil { if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil {
panic(err) panic(err)
} }
if err := csiNodeInfoEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.KubeletPluginsWatcher: {Default: true}}); err != nil {
panic(err)
}
if err := csiNodeInfoEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.CSINodeInfo: {Default: true}}); err != nil {
panic(err)
}
if err := csiNodeInfoDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.KubeletPluginsWatcher: {Default: false}}); err != nil {
panic(err)
}
if err := csiNodeInfoDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.CSINodeInfo: {Default: false}}); err != nil {
panic(err)
}
} }
func TestAuthorizer(t *testing.T) { func TestAuthorizer(t *testing.T) {
@ -338,6 +352,85 @@ func TestAuthorizer(t *testing.T) {
features: leaseEnabledFeature, features: leaseEnabledFeature,
expect: authorizer.DecisionAllow, expect: authorizer.DecisionAllow,
}, },
// CSINodeInfo
{
name: "disallowed CSINodeInfo - feature disabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoDisabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed CSINodeInfo with subresource - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "csinodeinfos", Subresource: "csiDrivers", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed get another node's CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node1"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed update another node's CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "update", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node1"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed patch another node's CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "patch", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node1"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed delete another node's CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "delete", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node1"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed list CSINodeInfos - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "list", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed watch CSINodeInfos - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "watch", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "allowed get CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed create CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "create", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed update CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "update", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed patch CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "patch", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed delete CSINodeInfo - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "delete", Resource: "csinodeinfos", APIGroup: "csi.storage.k8s.io", Name: "node0"},
features: csiNodeInfoEnabledFeature,
expect: authorizer.DecisionAllow,
},
} }
for _, tc := range tests { for _, tc := range tests {

View File

@ -164,6 +164,11 @@ func NodeRules() []rbacv1.PolicyRule {
nodePolicyRules = append(nodePolicyRules, csiDriverRule) nodePolicyRules = append(nodePolicyRules, csiDriverRule)
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiNodeInfoRule := rbacv1helpers.NewRule("get", "create", "update", "patch", "delete").Groups("csi.storage.k8s.io").Resources("csinodeinfos").RuleOrDie()
nodePolicyRules = append(nodePolicyRules, csiNodeInfoRule)
}
// Node leases // Node leases
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
@ -452,16 +457,6 @@ func ClusterRoles() []rbacv1.ClusterRole {
eventsRule(), eventsRule(),
}, },
}, },
{
// a role for the csi external provisioner
ObjectMeta: metav1.ObjectMeta{Name: "system:csi-external-provisioner"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("create", "delete", "get", "list", "watch").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
rbacv1helpers.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create", "update", "patch").Groups(legacyGroup).Resources("events").RuleOrDie(),
},
},
{ {
// a role for the csi external attacher // a role for the csi external attacher
ObjectMeta: metav1.ObjectMeta{Name: "system:csi-external-attacher"}, ObjectMeta: metav1.ObjectMeta{Name: "system:csi-external-attacher"},
@ -506,6 +501,22 @@ func ClusterRoles() []rbacv1.ClusterRole {
}) })
} }
externalProvisionerRules := []rbacv1.PolicyRule{
rbacv1helpers.NewRule("create", "delete", "get", "list", "watch").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
rbacv1helpers.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create", "update", "patch").Groups(legacyGroup).Resources("events").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
externalProvisionerRules = append(externalProvisionerRules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csinodeinfos").RuleOrDie())
}
roles = append(roles, rbacv1.ClusterRole{
// a role for the csi external provisioner
ObjectMeta: metav1.ObjectMeta{Name: "system:csi-external-provisioner"},
Rules: externalProvisionerRules,
})
addClusterRoleLabel(roles) addClusterRoleLabel(roles)
return roles return roles
} }

View File

@ -523,6 +523,14 @@ items:
- patch - patch
- update - update
- watch - watch
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -1,19 +1,5 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library")
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
@ -31,3 +17,17 @@ go_library(
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
], ],
) )
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -38,7 +38,7 @@ import (
) )
var csiImageVersions = map[string]string{ var csiImageVersions = map[string]string{
"hostpathplugin": "v0.2.0", "hostpathplugin": "canary", // TODO (verult) update tag once new hostpathplugin release is cut
"csi-attacher": "v0.2.0", "csi-attacher": "v0.2.0",
"csi-provisioner": "v0.2.1", "csi-provisioner": "v0.2.1",
"driver-registrar": "v0.3.0", "driver-registrar": "v0.3.0",

View File

@ -572,6 +572,8 @@ func TestNodeAuthorizer(t *testing.T) {
expectForbidden(t, updateNode1Lease(node2Client)) expectForbidden(t, updateNode1Lease(node2Client))
expectForbidden(t, patchNode1Lease(node2Client)) expectForbidden(t, patchNode1Lease(node2Client))
expectForbidden(t, deleteNode1Lease(node2Client)) expectForbidden(t, deleteNode1Lease(node2Client))
// TODO (verult) CSINodeInfo tests (issue #68254)
} }
// expect executes a function a set number of times until it either returns the // expect executes a function a set number of times until it either returns the