From ea474cd99cb4eb769bf8d80885edd817e2fe739f Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Mon, 4 Jun 2018 16:54:25 -0400 Subject: [PATCH] New labelmanager package --- pkg/volume/csi/labelmanager/BUILD | 30 +++ pkg/volume/csi/labelmanager/labelmanager.go | 251 ++++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 pkg/volume/csi/labelmanager/BUILD create mode 100644 pkg/volume/csi/labelmanager/labelmanager.go diff --git a/pkg/volume/csi/labelmanager/BUILD b/pkg/volume/csi/labelmanager/BUILD new file mode 100644 index 0000000000..1fed833564 --- /dev/null +++ b/pkg/volume/csi/labelmanager/BUILD @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["labelmanager.go"], + importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/volume/csi/labelmanager/labelmanager.go b/pkg/volume/csi/labelmanager/labelmanager.go new file mode 100644 index 0000000000..162d7d3dc8 --- /dev/null +++ b/pkg/volume/csi/labelmanager/labelmanager.go @@ -0,0 +1,251 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package labelmanager includes internal functions used to add/delete labels to +// kubernetes nodes for corresponding CSI drivers +package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager" + +import ( + "encoding/json" + "fmt" + + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/retry" +) + +const ( + // Name of node annotation that contains JSON map of driver names to node + // names + annotationKey = "csi.volume.kubernetes.io/nodeid" + csiPluginName = "kubernetes.io/csi" +) + +// labelManagementStruct is struct of channels used for communication between the driver registration +// code and the go routine responsible for managing the node's labels +type labelManagerStruct struct { + nodeName types.NodeName + k8s kubernetes.Interface +} + +// Interface implements an interface for managing labels of a node +type Interface interface { + AddLabels(driverName string) error +} + +// NewLabelManager initializes labelManagerStruct and returns available interfaces +func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { + return labelManagerStruct{ + nodeName: nodeName, + k8s: kubeClient, + } +} + +// nodeLabelManager waits for labeling requests initiated by the driver's registration +// process. +func (lm labelManagerStruct) AddLabels(driverName string) error { + err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, string(lm.nodeName)) + if err != nil { + return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err) + } + return nil +} + +// Clones the given map and returns a new map with the given key and value added. +// Returns the given map, if annotationKey is empty. +func cloneAndAddAnnotation( + annotations map[string]string, + annotationKey, + annotationValue string) map[string]string { + if annotationKey == "" { + // Don't need to add an annotation. + return annotations + } + // Clone. + newAnnotations := map[string]string{} + for key, value := range annotations { + newAnnotations[key] = value + } + newAnnotations[annotationKey] = annotationValue + return newAnnotations +} + +func verifyAndAddNodeId( + k8sNodeName string, + k8sNodesClient corev1.NodeInterface, + csiDriverName string, + csiDriverNodeId string) error { + // Add or update annotation on Node object + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("Failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + + var previousAnnotationValue string + if result.ObjectMeta.Annotations != nil { + previousAnnotationValue = + result.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue != "" { + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + } + + if val, ok := existingDriverMap[csiDriverName]; ok { + if val == csiDriverNodeId { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", + csiDriverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + return nil + } + } + + // Add/update annotation value + existingDriverMap[csiDriverName] = csiDriverNodeId + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return fmt.Errorf( + "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", + csiDriverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + } + + result.ObjectMeta.Annotations = cloneAndAddAnnotation( + result.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + _, updateErr := k8sNodesClient.Update(result) + if updateErr == nil { + fmt.Printf( + "Updated node %q successfully for CSI driver %q and CSI node name %q", + k8sNodeName, + csiDriverName, + csiDriverNodeId) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("node update failed: %v", retryErr) + } + return nil +} + +// Fetches Kubernetes node API object corresponding to k8sNodeName. +// If the csiDriverName is present in the node annotation, it is removed. +func verifyAndDeleteNodeId( + k8sNodeName string, + k8sNodesClient corev1.NodeInterface, + csiDriverName string) error { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + + var previousAnnotationValue string + if result.ObjectMeta.Annotations != nil { + previousAnnotationValue = + result.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue == "" { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key %q does not exist in node %q annotation, no need to cleanup.", + csiDriverName, + annotationKey) + return nil + } + + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + + if _, ok := existingDriverMap[csiDriverName]; !ok { + // Value already exists in node annotation, nothing more to do + glog.V(1).Infof( + "The key %q does not eixst in node %q annotation, no need to cleanup: %v", + csiDriverName, + annotationKey, + previousAnnotationValue) + return nil + } + + // Add/update annotation value + delete(existingDriverMap, csiDriverName) + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return fmt.Errorf( + "failed while trying to remove key %q from node %q annotation. Existing data: %v", + csiDriverName, + annotationKey, + previousAnnotationValue) + } + + result.ObjectMeta.Annotations = cloneAndAddAnnotation( + result.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + _, updateErr := k8sNodesClient.Update(result) + if updateErr == nil { + fmt.Printf( + "Updated node %q annotation to remove CSI driver %q.", + k8sNodeName, + csiDriverName) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("node update failed: %v", retryErr) + } + return nil +}