From 72d487a7f3076f7bbaf21f5faf6c3e0580d51611 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Fri, 5 Oct 2018 17:55:21 -0700 Subject: [PATCH] Delete cloud controller --- pkg/controller/cloud/BUILD | 96 -- pkg/controller/cloud/OWNERS | 12 - pkg/controller/cloud/main_test.go | 29 - pkg/controller/cloud/node_controller.go | 541 -------- pkg/controller/cloud/node_controller_test.go | 1251 ------------------ pkg/controller/cloud/pvlcontroller.go | 328 ----- pkg/controller/cloud/pvlcontroller_test.go | 573 -------- 7 files changed, 2830 deletions(-) delete mode 100644 pkg/controller/cloud/BUILD delete mode 100644 pkg/controller/cloud/OWNERS delete mode 100644 pkg/controller/cloud/main_test.go delete mode 100644 pkg/controller/cloud/node_controller.go delete mode 100644 pkg/controller/cloud/node_controller_test.go delete mode 100644 pkg/controller/cloud/pvlcontroller.go delete mode 100644 pkg/controller/cloud/pvlcontroller_test.go diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD deleted file mode 100644 index 25e1c6fe6b..0000000000 --- a/pkg/controller/cloud/BUILD +++ /dev/null @@ -1,96 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = [ - "node_controller.go", - "pvlcontroller.go", - ], - importpath = "k8s.io/kubernetes/pkg/controller/cloud", - deps = [ - "//pkg/api/v1/node:go_default_library", - "//pkg/apis/core/v1/helper:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/util/node:go_default_library", - "//pkg/features:go_default_library", - "//pkg/kubelet/apis:go_default_library", - "//pkg/scheduler/api:go_default_library", - "//pkg/util/node:go_default_library", - "//pkg/volume/util:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", - "//staging/src/k8s.io/client-go/util/retry:go_default_library", - "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", - "//staging/src/k8s.io/cloud-provider:go_default_library", - "//vendor/k8s.io/klog:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = [ - "main_test.go", - "node_controller_test.go", - "pvlcontroller_test.go", - ], - embed = [":go_default_library"], - deps = [ - "//pkg/cloudprovider/providers/fake:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/testutil:go_default_library", - "//pkg/features:go_default_library", - "//pkg/kubelet/apis:go_default_library", - "//pkg/scheduler/api:go_default_library", - "//pkg/volume/util:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//staging/src/k8s.io/client-go/testing:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", - "//staging/src/k8s.io/cloud-provider:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/klog:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/controller/cloud/OWNERS b/pkg/controller/cloud/OWNERS deleted file mode 100644 index 789b468a02..0000000000 --- a/pkg/controller/cloud/OWNERS +++ /dev/null @@ -1,12 +0,0 @@ -approvers: -- thockin -- luxas -- wlan0 -- andrewsykim -reviewers: -- thockin -- luxas -- wlan0 -- andrewsykim -labels: -- sig/cloud-provider diff --git a/pkg/controller/cloud/main_test.go b/pkg/controller/cloud/main_test.go deleted file mode 100644 index a2abc54d01..0000000000 --- a/pkg/controller/cloud/main_test.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "testing" - - utilfeature "k8s.io/apiserver/pkg/util/feature" - utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" - _ "k8s.io/kubernetes/pkg/features" -) - -func TestMain(m *testing.M) { - utilfeaturetesting.VerifyFeatureGatesUnchanged(utilfeature.DefaultFeatureGate, m.Run) -} diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go deleted file mode 100644 index bfce4e59d4..0000000000 --- a/pkg/controller/cloud/node_controller.go +++ /dev/null @@ -1,541 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "context" - "errors" - "fmt" - "time" - - "k8s.io/klog" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - coreinformers "k8s.io/client-go/informers/core/v1" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - clientretry "k8s.io/client-go/util/retry" - cloudprovider "k8s.io/cloud-provider" - nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node" - "k8s.io/kubernetes/pkg/controller" - nodectrlutil "k8s.io/kubernetes/pkg/controller/util/node" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - nodeutil "k8s.io/kubernetes/pkg/util/node" -) - -var UpdateNodeSpecBackoff = wait.Backoff{ - Steps: 20, - Duration: 50 * time.Millisecond, - Jitter: 1.0, -} - -type CloudNodeController struct { - nodeInformer coreinformers.NodeInformer - kubeClient clientset.Interface - recorder record.EventRecorder - - cloud cloudprovider.Interface - - // Value controlling NodeController monitoring period, i.e. how often does NodeController - // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod - // set in controller-manager - nodeMonitorPeriod time.Duration - - nodeStatusUpdateFrequency time.Duration -} - -const ( - // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. - nodeStatusUpdateRetry = 5 - - // The amount of time the nodecontroller should sleep between retrying NodeStatus updates - retrySleepTime = 20 * time.Millisecond -) - -// NewCloudNodeController creates a CloudNodeController object -func NewCloudNodeController( - nodeInformer coreinformers.NodeInformer, - kubeClient clientset.Interface, - cloud cloudprovider.Interface, - nodeMonitorPeriod time.Duration, - nodeStatusUpdateFrequency time.Duration) *CloudNodeController { - - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) - eventBroadcaster.StartLogging(klog.Infof) - if kubeClient != nil { - klog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - } else { - klog.V(0).Infof("No api server defined - no events will be sent to API server.") - } - - cnc := &CloudNodeController{ - nodeInformer: nodeInformer, - kubeClient: kubeClient, - recorder: recorder, - cloud: cloud, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, - } - - // Use shared informer to listen to add/update of nodes. Note that any nodes - // that exist before node controller starts will show up in the update method - cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cnc.AddCloudNode, - UpdateFunc: cnc.UpdateCloudNode, - }) - - return cnc -} - -// This controller deletes a node if kubelet is not reporting -// and the node is gone from the cloud provider. -func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - // The following loops run communicate with the APIServer with a worst case complexity - // of O(num_nodes) per cycle. These functions are justified here because these events fire - // very infrequently. DO NOT MODIFY this to perform frequent operations. - - // Start a loop to periodically update the node addresses obtained from the cloud - go wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh) - - // Start a loop to periodically check if any nodes have been deleted from cloudprovider - go wait.Until(cnc.MonitorNode, cnc.nodeMonitorPeriod, stopCh) -} - -// UpdateNodeStatus updates the node status, such as node addresses -func (cnc *CloudNodeController) UpdateNodeStatus() { - instances, ok := cnc.cloud.Instances() - if !ok { - utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) - return - } - - nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) - if err != nil { - klog.Errorf("Error monitoring node status: %v", err) - return - } - - for i := range nodes.Items { - cnc.updateNodeAddress(&nodes.Items[i], instances) - } -} - -// UpdateNodeAddress updates the nodeAddress of a single node -func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) { - // Do not process nodes that are still tainted - cloudTaint := getCloudTaint(node.Spec.Taints) - if cloudTaint != nil { - klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name) - return - } - // Node that isn't present according to the cloud provider shouldn't have its address updated - exists, err := ensureNodeExistsByProviderID(instances, node) - if err != nil { - // Continue to update node address when not sure the node is not exists - klog.Errorf("%v", err) - } else if !exists { - klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name) - return - } - - nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node) - if err != nil { - klog.Errorf("%v", err) - return - } - - if len(nodeAddresses) == 0 { - klog.V(5).Infof("Skipping node address update for node %q since cloud provider did not return any", node.Name) - return - } - - // Check if a hostname address exists in the cloud provided addresses - hostnameExists := false - for i := range nodeAddresses { - if nodeAddresses[i].Type == v1.NodeHostName { - hostnameExists = true - } - } - // If hostname was not present in cloud provided addresses, use the hostname - // from the existing node (populated by kubelet) - if !hostnameExists { - for _, addr := range node.Status.Addresses { - if addr.Type == v1.NodeHostName { - nodeAddresses = append(nodeAddresses, addr) - } - } - } - // If nodeIP was suggested by user, ensure that - // it can be found in the cloud as well (consistent with the behaviour in kubelet) - if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { - if nodeIP == nil { - klog.Errorf("Specified Node IP not found in cloudprovider") - return - } - } - newNode := node.DeepCopy() - newNode.Status.Addresses = nodeAddresses - if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { - return - } - _, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) - if err != nil { - klog.Errorf("Error patching node with cloud ip addresses = [%v]", err) - } -} - -// Monitor node queries the cloudprovider for non-ready nodes and deletes them -// if they cannot be found in the cloud provider -func (cnc *CloudNodeController) MonitorNode() { - instances, ok := cnc.cloud.Instances() - if !ok { - utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) - return - } - - nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) - if err != nil { - klog.Errorf("Error monitoring node status: %v", err) - return - } - - for i := range nodes.Items { - var currentReadyCondition *v1.NodeCondition - node := &nodes.Items[i] - // Try to get the current node status - // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node - for rep := 0; rep < nodeStatusUpdateRetry; rep++ { - _, currentReadyCondition = nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady) - if currentReadyCondition != nil { - break - } - name := node.Name - node, err = cnc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{}) - if err != nil { - klog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) - break - } - time.Sleep(retrySleepTime) - } - if currentReadyCondition == nil { - klog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count or the Node was deleted.", node.Name) - continue - } - // If the known node status says that Node is NotReady, then check if the node has been removed - // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately - if currentReadyCondition != nil { - if currentReadyCondition.Status != v1.ConditionTrue { - // we need to check this first to get taint working in similar in all cloudproviders - // current problem is that shutdown nodes are not working in similar way ie. all cloudproviders - // does not delete node from kubernetes cluster when instance it is shutdown see issue #46442 - shutdown, err := nodectrlutil.ShutdownInCloudProvider(context.TODO(), cnc.cloud, node) - if err != nil { - klog.Errorf("Error checking if node %s is shutdown: %v", node.Name, err) - } - - if shutdown && err == nil { - // if node is shutdown add shutdown taint - err = controller.AddOrUpdateTaintOnNode(cnc.kubeClient, node.Name, controller.ShutdownTaint) - if err != nil { - klog.Errorf("Error patching node taints: %v", err) - } - // Continue checking the remaining nodes since the current one is shutdown. - continue - } - - // Check with the cloud provider to see if the node still exists. If it - // doesn't, delete the node immediately. - exists, err := ensureNodeExistsByProviderID(instances, node) - if err != nil { - klog.Errorf("Error checking if node %s exists: %v", node.Name, err) - continue - } - - if exists { - // Continue checking the remaining nodes since the current one is fine. - continue - } - - klog.V(2).Infof("Deleting node since it is no longer present in cloud provider: %s", node.Name) - - ref := &v1.ObjectReference{ - Kind: "Node", - Name: node.Name, - UID: types.UID(node.UID), - Namespace: "", - } - klog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) - - cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") - - go func(nodeName string) { - defer utilruntime.HandleCrash() - if err := cnc.kubeClient.CoreV1().Nodes().Delete(nodeName, nil); err != nil { - klog.Errorf("unable to delete node %q: %v", nodeName, err) - } - }(node.Name) - - } else { - // if taint exist remove taint - err = controller.RemoveTaintOffNode(cnc.kubeClient, node.Name, node, controller.ShutdownTaint) - if err != nil { - klog.Errorf("Error patching node taints: %v", err) - } - } - } - } -} - -func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { - if _, ok := newObj.(*v1.Node); !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) - return - } - cnc.AddCloudNode(newObj) -} - -// This processes nodes that were added into the cluster, and cloud initialize them if appropriate -func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { - node := obj.(*v1.Node) - - cloudTaint := getCloudTaint(node.Spec.Taints) - if cloudTaint == nil { - klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name) - return - } - - instances, ok := cnc.cloud.Instances() - if !ok { - utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) - return - } - - err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { - // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition - // Since there are node taints, do we still need this? - // This condition marks the node as unusable until routes are initialized in the cloud provider - if cnc.cloud.ProviderName() == "gce" { - if err := nodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{ - Type: v1.NodeNetworkUnavailable, - Status: v1.ConditionTrue, - Reason: "NoRouteCreated", - Message: "Node created without a route", - LastTransitionTime: metav1.Now(), - }); err != nil { - return err - } - } - - curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - if curNode.Spec.ProviderID == "" { - providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name)) - if err == nil { - curNode.Spec.ProviderID = providerID - } else { - // we should attempt to set providerID on curNode, but - // we can continue if we fail since we will attempt to set - // node addresses given the node name in getNodeAddressesByProviderIDOrName - klog.Errorf("failed to set node provider id: %v", err) - } - } - - nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode) - if err != nil { - return err - } - - // If user provided an IP address, ensure that IP address is found - // in the cloud provider before removing the taint on the node - if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok { - if nodeIP == nil { - return errors.New("failed to find kubelet node IP from cloud provider") - } - } - - if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil { - return err - } else if instanceType != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelInstanceType, instanceType) - curNode.ObjectMeta.Labels[kubeletapis.LabelInstanceType] = instanceType - } - - if zones, ok := cnc.cloud.Zones(); ok { - zone, err := getZoneByProviderIDOrName(zones, curNode) - if err != nil { - return fmt.Errorf("failed to get zone from cloud provider: %v", err) - } - if zone.FailureDomain != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneFailureDomain, zone.FailureDomain) - curNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] = zone.FailureDomain - } - if zone.Region != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneRegion, zone.Region) - curNode.ObjectMeta.Labels[kubeletapis.LabelZoneRegion] = zone.Region - } - } - - curNode.Spec.Taints = excludeTaintFromList(curNode.Spec.Taints, *cloudTaint) - - _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode) - if err != nil { - return err - } - // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses - // So that users do not see any significant delay in IP addresses being filled into the node - cnc.updateNodeAddress(curNode, instances) - return nil - }) - if err != nil { - utilruntime.HandleError(err) - return - } - - klog.Infof("Successfully initialized node %s with cloud provider", node.Name) -} - -func getCloudTaint(taints []v1.Taint) *v1.Taint { - for _, taint := range taints { - if taint.Key == schedulerapi.TaintExternalCloudProvider { - return &taint - } - } - return nil -} - -func excludeTaintFromList(taints []v1.Taint, toExclude v1.Taint) []v1.Taint { - newTaints := []v1.Taint{} - for _, taint := range taints { - if toExclude.MatchTaint(&taint) { - continue - } - newTaints = append(newTaints, taint) - } - return newTaints -} - -// ensureNodeExistsByProviderID checks if the instance exists by the provider id, -// If provider id in spec is empty it calls instanceId with node name to get provider id -func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) { - providerID := node.Spec.ProviderID - if providerID == "" { - var err error - providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name)) - if err != nil { - if err == cloudprovider.InstanceNotFound { - return false, nil - } - return false, err - } - - if providerID == "" { - klog.Warningf("Cannot find valid providerID for node name %q, assuming non existence", node.Name) - return false, nil - } - } - - return instances.InstanceExistsByProviderID(context.TODO(), providerID) -} - -func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { - nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID) - if err != nil { - providerIDErr := err - nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name)) - if err != nil { - return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) - } - } - return nodeAddresses, nil -} - -func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool { - if len(addressSet1) != len(addressSet2) { - return true - } - addressMap1 := map[v1.NodeAddressType]string{} - addressMap2 := map[v1.NodeAddressType]string{} - - for i := range addressSet1 { - addressMap1[addressSet1[i].Type] = addressSet1[i].Address - addressMap2[addressSet2[i].Type] = addressSet2[i].Address - } - - for k, v := range addressMap1 { - if addressMap2[k] != v { - return true - } - } - return false -} - -func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (*v1.NodeAddress, bool) { - var nodeIP *v1.NodeAddress - nodeIPExists := false - if providedIP, ok := node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr]; ok { - nodeIPExists = true - for i := range nodeAddresses { - if nodeAddresses[i].Address == providedIP { - nodeIP = &nodeAddresses[i] - break - } - } - } - return nodeIP, nodeIPExists -} - -func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) { - instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID) - if err != nil { - providerIDErr := err - instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name)) - if err != nil { - return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) - } - } - return instanceType, err -} - -// getZoneByProviderIDorName will attempt to get the zone of node using its providerID -// then it's name. If both attempts fail, an error is returned -func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) { - zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID) - if err != nil { - providerIDErr := err - zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name)) - if err != nil { - return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) - } - } - - return zone, nil -} diff --git a/pkg/controller/cloud/node_controller_test.go b/pkg/controller/cloud/node_controller_test.go deleted file mode 100644 index 855594dcc8..0000000000 --- a/pkg/controller/cloud/node_controller_test.go +++ /dev/null @@ -1,1251 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "errors" - "testing" - "time" - - "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/kubernetes/scheme" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/record" - cloudprovider "k8s.io/cloud-provider" - fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/testutil" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - - "github.com/stretchr/testify/assert" - "k8s.io/klog" -) - -func TestEnsureNodeExistsByProviderID(t *testing.T) { - - testCases := []struct { - testName string - node *v1.Node - expectedCalls []string - expectedNodeExists bool - hasInstanceID bool - existsByProviderID bool - nodeNameErr error - providerIDErr error - }{ - { - testName: "node exists by provider id", - existsByProviderID: true, - providerIDErr: nil, - hasInstanceID: true, - nodeNameErr: errors.New("unimplemented"), - expectedCalls: []string{"instance-exists-by-provider-id"}, - expectedNodeExists: true, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - }, - Spec: v1.NodeSpec{ - ProviderID: "node0", - }, - }, - }, - { - testName: "does not exist by provider id", - existsByProviderID: false, - providerIDErr: nil, - hasInstanceID: true, - nodeNameErr: errors.New("unimplemented"), - expectedCalls: []string{"instance-exists-by-provider-id"}, - expectedNodeExists: false, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - }, - Spec: v1.NodeSpec{ - ProviderID: "node0", - }, - }, - }, - { - testName: "exists by instance id", - existsByProviderID: true, - providerIDErr: nil, - hasInstanceID: true, - nodeNameErr: nil, - expectedCalls: []string{"instance-id", "instance-exists-by-provider-id"}, - expectedNodeExists: true, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - }, - }, - }, - { - testName: "does not exist by no instance id", - existsByProviderID: true, - providerIDErr: nil, - hasInstanceID: false, - nodeNameErr: cloudprovider.InstanceNotFound, - expectedCalls: []string{"instance-id"}, - expectedNodeExists: false, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - }, - }, - }, - { - testName: "provider id returns error", - existsByProviderID: false, - providerIDErr: errors.New("unimplemented"), - hasInstanceID: true, - nodeNameErr: cloudprovider.InstanceNotFound, - expectedCalls: []string{"instance-exists-by-provider-id"}, - expectedNodeExists: false, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - }, - Spec: v1.NodeSpec{ - ProviderID: "node0", - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.testName, func(t *testing.T) { - fc := &fakecloud.FakeCloud{ - ExistsByProviderID: tc.existsByProviderID, - Err: tc.nodeNameErr, - ErrByProviderID: tc.providerIDErr, - } - - if tc.hasInstanceID { - fc.ExtID = map[types.NodeName]string{ - types.NodeName(tc.node.Name): "provider-id://a", - } - } - - instances, _ := fc.Instances() - exists, err := ensureNodeExistsByProviderID(instances, tc.node) - assert.Equal(t, err, tc.providerIDErr) - - assert.EqualValues(t, tc.expectedCalls, fc.Calls, - "expected cloud provider methods `%v` to be called but `%v` was called ", - tc.expectedCalls, fc.Calls) - - assert.Equal(t, tc.expectedNodeExists, exists, - "expected exists to be `%t` but got `%t`", - tc.existsByProviderID, exists) - }) - } - -} - -func TestNodeShutdown(t *testing.T) { - - testCases := []struct { - testName string - node *v1.Node - existsByProviderID bool - shutdown bool - }{ - { - testName: "node shutdowned add taint", - existsByProviderID: true, - shutdown: true, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Spec: v1.NodeSpec{ - ProviderID: "node0", - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - { - testName: "node started after shutdown remove taint", - existsByProviderID: true, - shutdown: false, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Spec: v1.NodeSpec{ - ProviderID: "node0", - Taints: []v1.Taint{ - { - Key: schedulerapi.TaintNodeShutdown, - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - } - for _, tc := range testCases { - t.Run(tc.testName, func(t *testing.T) { - fc := &fakecloud.FakeCloud{ - ExistsByProviderID: tc.existsByProviderID, - NodeShutdown: tc.shutdown, - } - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{tc.node}, - Clientset: fake.NewSimpleClientset(), - PatchWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fc, - nodeMonitorPeriod: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - nodeStatusUpdateFrequency: 1 * time.Second, - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.Run(wait.NeverStop) - - select { - case <-fnh.PatchWaitChan: - case <-time.After(1 * time.Second): - t.Errorf("Timed out waiting %v for node to be updated", wait.ForeverTestTimeout) - } - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - if tc.shutdown { - assert.Equal(t, 1, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not added") - assert.Equal(t, "node.cloudprovider.kubernetes.io/shutdown", fnh.UpdatedNodes[0].Spec.Taints[0].Key, "Node Taint key is not correct") - } else { - assert.Equal(t, 0, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not removed after node is back in ready state") - } - - }) - } - -} - -// This test checks that the node is deleted when kubelet stops reporting -// and cloud provider says node is gone -func TestNodeDeleted(t *testing.T) { - pod0 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "pod0", - }, - Spec: v1.PodSpec{ - NodeName: "node0", - }, - Status: v1.PodStatus{ - Conditions: []v1.PodCondition{ - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, - }, - }, - } - - pod1 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "pod1", - }, - Spec: v1.PodSpec{ - NodeName: "node0", - }, - Status: v1.PodStatus{ - Conditions: []v1.PodCondition{ - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, - }, - }, - } - - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod0, *pod1}}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: &fakecloud.FakeCloud{ - ExistsByProviderID: false, - Err: nil, - }, - nodeMonitorPeriod: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - nodeStatusUpdateFrequency: 1 * time.Second, - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.Run(wait.NeverStop) - - select { - case <-fnh.DeleteWaitChan: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout) - } - - assert.Equal(t, 1, len(fnh.DeletedNodes), "Node was not deleted") - assert.Equal(t, "node0", fnh.DeletedNodes[0].Name, "Node was not deleted") -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized -func TestNodeInitialized(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{ - types.NodeName("node0"): "t1.micro", - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - nodeStatusUpdateFrequency: 1 * time.Second, - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 0, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not removed after cloud init") -} - -// This test checks that a node without the external cloud provider taint are NOT cloudprovider initialized -func TestNodeIgnored(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{ - types.NodeName("node0"): "t1.micro", - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - assert.Equal(t, 0, len(fnh.UpdatedNodes), "Node was wrongly updated") - -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized and -// the GCE route condition is added if cloudprovider is GCE -func TestGCECondition(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{ - types.NodeName("node0"): "t1.micro", - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "gce", - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 2, len(fnh.UpdatedNodes[0].Status.Conditions), "No new conditions were added for GCE") - - conditionAdded := false - for _, cond := range fnh.UpdatedNodes[0].Status.Conditions { - if cond.Status == "True" && cond.Type == "NetworkUnavailable" && cond.Reason == "NoRouteCreated" { - conditionAdded = true - } - } - - assert.True(t, conditionAdded, "Network Route Condition for GCE not added by external cloud initializer") -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized and -// and that zone labels are added correctly -func TestZoneInitialized(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{ - types.NodeName("node0"): "t1.micro", - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "aws", - Zone: cloudprovider.Zone{ - FailureDomain: "us-west-1a", - Region: "us-west", - }, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 2, len(fnh.UpdatedNodes[0].ObjectMeta.Labels), - "Node label for Region and Zone were not set") - assert.Equal(t, "us-west", fnh.UpdatedNodes[0].ObjectMeta.Labels[kubeletapis.LabelZoneRegion], - "Node Region not correctly updated") - assert.Equal(t, "us-west-1a", fnh.UpdatedNodes[0].ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain], - "Node FailureDomain not correctly updated") -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized and -// and nodeAddresses are updated from the cloudprovider -func TestNodeAddresses(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: "ImproveCoverageTaint", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{}, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "aws", - Zone: cloudprovider.Zone{ - FailureDomain: "us-west-1a", - Region: "us-west", - }, - ExistsByProviderID: true, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - nodeStatusUpdateFrequency: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status not updated") - - fakeCloud.Addresses = []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - } - - cloudNodeController.Run(wait.NeverStop) - - <-time.After(2 * time.Second) - - updatedNodes := fnh.GetUpdatedNodesCopy() - - assert.Equal(t, 2, len(updatedNodes[0].Status.Addresses), "Node Addresses not correctly updated") - -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized and -// and the provided node ip is validated with the cloudprovider and nodeAddresses are updated from the cloudprovider -func TestNodeProvidedIPAddresses(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - Annotations: map[string]string{ - kubeletapis.AnnotationProvidedIPAddr: "10.0.0.1", - }, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: "ImproveCoverageTaint", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - ProviderID: "node0.aws.12345", - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{ - types.NodeName("node0"): "t1.micro", - types.NodeName("node0.aws.12345"): "t2.macro", - }, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "aws", - Zone: cloudprovider.Zone{ - FailureDomain: "us-west-1a", - Region: "us-west", - }, - ExistsByProviderID: true, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - nodeStatusUpdateFrequency: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated") - - cloudNodeController.Run(wait.NeverStop) - - <-time.After(2 * time.Second) - - updatedNodes := fnh.GetUpdatedNodesCopy() - - assert.Equal(t, 3, len(updatedNodes[0].Status.Addresses), "Node Addresses not correctly updated") - assert.Equal(t, "10.0.0.1", updatedNodes[0].Status.Addresses[0].Address, "Node Addresses not correctly updated") -} - -// Tests that node address changes are detected correctly -func TestNodeAddressesChangeDetected(t *testing.T) { - addressSet1 := []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - } - addressSet2 := []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - } - - assert.False(t, nodeAddressesChangeDetected(addressSet1, addressSet2), - "Node address changes are not detected correctly") - - addressSet1 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.164", - }, - } - addressSet2 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - } - - assert.True(t, nodeAddressesChangeDetected(addressSet1, addressSet2), - "Node address changes are not detected correctly") - - addressSet1 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.164", - }, - { - Type: v1.NodeHostName, - Address: "hostname.zone.region.aws.test", - }, - } - addressSet2 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.164", - }, - } - - assert.True(t, nodeAddressesChangeDetected(addressSet1, addressSet2), - "Node address changes are not detected correctly") - - addressSet1 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.164", - }, - } - addressSet2 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.164", - }, - { - Type: v1.NodeHostName, - Address: "hostname.zone.region.aws.test", - }, - } - - assert.True(t, nodeAddressesChangeDetected(addressSet1, addressSet2), - "Node address changes are not detected correctly") - - addressSet1 = []v1.NodeAddress{ - { - Type: v1.NodeExternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeInternalIP, - Address: "132.143.154.163", - }, - } - addressSet2 = []v1.NodeAddress{ - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - } - - assert.True(t, nodeAddressesChangeDetected(addressSet1, addressSet2), - "Node address changes are not detected correctly") -} - -// This test checks that a node with the external cloud provider taint is cloudprovider initialized and -// and node addresses will not be updated when node isn't present according to the cloudprovider -func TestNodeAddressesNotUpdate(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: "ImproveCoverageTaint", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{}, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - ExistsByProviderID: false, - Err: nil, - } - - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - } - - cloudNodeController.updateNodeAddress(fnh.Existing[0], fakeCloud) - - if len(fnh.UpdatedNodes) != 0 { - t.Errorf("Node was not correctly updated, the updated len(nodes) got: %v, wanted=0", len(fnh.UpdatedNodes)) - } -} - -// This test checks that a node is set with the correct providerID -func TestNodeProviderID(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - Taints: []v1.Taint{ - { - Key: "ImproveCoverageTaint", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{}, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "test", - ExtID: map[types.NodeName]string{ - types.NodeName("node0"): "12345", - }, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - nodeStatusUpdateFrequency: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, "test://12345", fnh.UpdatedNodes[0].Spec.ProviderID, "Node ProviderID not set correctly") -} - -// This test checks that a node's provider ID will not be overwritten -func TestNodeProviderIDAlreadySet(t *testing.T) { - fnh := &testutil.FakeNodeHandler{ - Existing: []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node0", - CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - Labels: map[string]string{}, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionUnknown, - LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - }, - }, - }, - Spec: v1.NodeSpec{ - ProviderID: "test-provider-id", - Taints: []v1.Taint{ - { - Key: "ImproveCoverageTaint", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: schedulerapi.TaintExternalCloudProvider, - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, - }, - Clientset: fake.NewSimpleClientset(&v1.PodList{}), - DeleteWaitChan: make(chan struct{}), - } - - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) - - fakeCloud := &fakecloud.FakeCloud{ - InstanceTypes: map[types.NodeName]string{}, - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeHostName, - Address: "node0.cloud.internal", - }, - { - Type: v1.NodeInternalIP, - Address: "10.0.0.1", - }, - { - Type: v1.NodeExternalIP, - Address: "132.143.154.163", - }, - }, - Provider: "test", - ExtID: map[types.NodeName]string{ - types.NodeName("node0"): "12345", - }, - Err: nil, - } - - eventBroadcaster := record.NewBroadcaster() - cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: fakeCloud, - nodeMonitorPeriod: 5 * time.Second, - nodeStatusUpdateFrequency: 1 * time.Second, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), - } - eventBroadcaster.StartLogging(klog.Infof) - - cloudNodeController.AddCloudNode(fnh.Existing[0]) - - assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") - assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - // CCM node controller should not overwrite provider if it's already set - assert.Equal(t, "test-provider-id", fnh.UpdatedNodes[0].Spec.ProviderID, "Node ProviderID not set correctly") -} diff --git a/pkg/controller/cloud/pvlcontroller.go b/pkg/controller/cloud/pvlcontroller.go deleted file mode 100644 index 5cce125a6a..0000000000 --- a/pkg/controller/cloud/pvlcontroller.go +++ /dev/null @@ -1,328 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "k8s.io/klog" - - "k8s.io/api/core/v1" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - utilfeature "k8s.io/apiserver/pkg/util/feature" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/features" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - volumeutil "k8s.io/kubernetes/pkg/volume/util" - - "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - - cloudprovider "k8s.io/cloud-provider" - "k8s.io/kubernetes/pkg/controller" -) - -const initializerName = "pvlabel.kubernetes.io" - -// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created -type PersistentVolumeLabelController struct { - cloud cloudprovider.Interface - kubeClient kubernetes.Interface - pvlController cache.Controller - pvlIndexer cache.Indexer - volumeLister corelisters.PersistentVolumeLister - - syncHandler func(key string) error - - // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors - queue workqueue.RateLimitingInterface -} - -// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object -func NewPersistentVolumeLabelController( - kubeClient kubernetes.Interface, - cloud cloudprovider.Interface) *PersistentVolumeLabelController { - - pvlc := &PersistentVolumeLabelController{ - cloud: cloud, - kubeClient: kubeClient, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"), - } - pvlc.syncHandler = pvlc.addLabelsAndAffinity - pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.IncludeUninitialized = true - return kubeClient.CoreV1().PersistentVolumes().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.IncludeUninitialized = true - return kubeClient.CoreV1().PersistentVolumes().Watch(options) - }, - }, - &v1.PersistentVolume{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - pvlc.queue.Add(key) - } - }, - }, - cache.Indexers{}, - ) - pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer) - - return pvlc -} - -// Run starts a controller that adds labels to persistent volumes -func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - defer pvlc.queue.ShutDown() - - klog.Infof("Starting PersistentVolumeLabelController") - defer klog.Infof("Shutting down PersistentVolumeLabelController") - - go pvlc.pvlController.Run(stopCh) - - if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) { - return - } - - // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers - for i := 0; i < threadiness; i++ { - // runWorker will loop until "something bad" happens. The .Until will then rekick the worker - // after one second - go wait.Until(pvlc.runWorker, time.Second, stopCh) - } - - // wait until we're told to stop - <-stopCh -} - -func (pvlc *PersistentVolumeLabelController) runWorker() { - // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work - // available, so we don't worry about secondary waits - for pvlc.processNextWorkItem() { - } -} - -// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool { - // pull the next work item from queue. It should be a key we use to lookup something in a cache - keyObj, quit := pvlc.queue.Get() - if quit { - return false - } - // you always have to indicate to the queue that you've completed a piece of work - defer pvlc.queue.Done(keyObj) - - key := keyObj.(string) - // do your work on the key. This method will contains your "do stuff" logic - err := pvlc.syncHandler(key) - if err == nil { - // if you had no error, tell the queue to stop tracking history for your key. This will - // reset things like failure counts for per-item rate limiting - pvlc.queue.Forget(key) - return true - } - - // there was a failure so be sure to report it. This method allows for pluggable error handling - // which can be used for things like cluster-monitoring - utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) - - // since we failed, we should requeue the item to work on later. This method will add a backoff - // to avoid hotlooping on particular items (they're probably still not going to work right away) - // and overall controller protection (everything I've done is broken, this controller needs to - // calm down or it can starve other useful work) cases. - pvlc.queue.AddRateLimited(key) - - return true -} - -// AddLabels adds appropriate labels to persistent volumes and sets the -// volume as available if successful. -func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error { - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err) - } - volume, err := pvlc.volumeLister.Get(name) - if errors.IsNotFound(err) { - return nil - } else if err != nil { - return fmt.Errorf("error getting volume %s from informer: %v", name, err) - } - - return pvlc.addLabelsAndAffinityToVolume(volume) -} - -func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error { - var volumeLabels map[string]string - // Only add labels if the next pending initializer. - if needsInitialization(vol.Initializers, initializerName) { - if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok { - labels, err := labeler.GetLabelsForVolume(context.TODO(), vol) - if err != nil { - return fmt.Errorf("error querying volume %v: %v", vol.Spec, err) - } - volumeLabels = labels - } else { - klog.V(4).Info("cloud provider does not support PVLabeler") - } - return pvlc.updateVolume(vol, volumeLabels) - } - return nil -} - -func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { - volName := vol.Name - newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) - populateAffinity := utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) && len(volLabels) != 0 - - if newVolume.Labels == nil { - newVolume.Labels = make(map[string]string) - } - - requirements := make([]v1.NodeSelectorRequirement, 0) - for k, v := range volLabels { - newVolume.Labels[k] = v - // Set NodeSelectorRequirements based on the labels - if populateAffinity { - var values []string - if k == kubeletapis.LabelZoneFailureDomain { - zones, err := volumeutil.LabelZonesToSet(v) - if err != nil { - return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v) - } - values = zones.List() - } else { - values = []string{v} - } - requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values}) - } - } - if populateAffinity { - if newVolume.Spec.NodeAffinity == nil { - newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity) - } - if newVolume.Spec.NodeAffinity.Required == nil { - newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector) - } - if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { - // Need at least one term pre-allocated whose MatchExpressions can be appended to - newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1) - } - // Populate NodeAffinity with requirements if there are no conflicting keys found - if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) { - klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.", - requirements, newVolume.Spec.NodeAffinity) - } else { - for _, req := range requirements { - for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms { - newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req) - } - } - } - } - newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName) - klog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name) - - oldData, err := json.Marshal(vol) - if err != nil { - return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err) - } - - newData, err := json.Marshal(newVolume) - if err != nil { - return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err) - } - - patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{}) - if err != nil { - return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err) - } - return patch, nil -} - -func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error { - volName := vol.Name - klog.V(4).Infof("updating PersistentVolume %s", volName) - patchBytes, err := pvlc.createPatch(vol, volLabels) - if err != nil { - return err - } - - _, err = pvlc.kubeClient.CoreV1().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes) - if err != nil { - return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err) - } - klog.V(4).Infof("updated PersistentVolume %s", volName) - - return nil -} - -func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers { - if initializers == nil { - return nil - } - - var updated []metav1.Initializer - for _, pending := range initializers.Pending { - if pending.Name != name { - updated = append(updated, pending) - } - } - if len(updated) == len(initializers.Pending) { - return initializers - } - if len(updated) == 0 { - return nil - } - - return &metav1.Initializers{Pending: updated} -} - -// needsInitialization checks whether or not the PVL is the next pending initializer. -func needsInitialization(initializers *metav1.Initializers, name string) bool { - if initializers == nil { - return false - } - - if len(initializers.Pending) == 0 { - return false - } - - // There is at least one initializer still pending so check to - // see if the PVL is the next in line. - return initializers.Pending[0].Name == name -} diff --git a/pkg/controller/cloud/pvlcontroller_test.go b/pkg/controller/cloud/pvlcontroller_test.go deleted file mode 100644 index 1931540d1e..0000000000 --- a/pkg/controller/cloud/pvlcontroller_test.go +++ /dev/null @@ -1,573 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "encoding/json" - "testing" - "time" - - "k8s.io/api/core/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - - sets "k8s.io/apimachinery/pkg/util/sets" - utilfeature "k8s.io/apiserver/pkg/util/feature" - utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" - "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" - "k8s.io/kubernetes/pkg/features" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - volumeutil "k8s.io/kubernetes/pkg/volume/util" - - fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" -) - -func nodeSelectorRequirementsEqual(r1, r2 v1.NodeSelectorRequirement) bool { - if r1.Key != r2.Key { - return false - } - if r1.Operator != r2.Operator { - return false - } - vals1 := sets.NewString(r1.Values...) - vals2 := sets.NewString(r2.Values...) - if vals1.Equal(vals2) { - return true - } - return false -} - -func nodeSelectorTermsEqual(t1, t2 v1.NodeSelectorTerm) bool { - exprs1 := t1.MatchExpressions - exprs2 := t2.MatchExpressions - fields1 := t1.MatchFields - fields2 := t2.MatchFields - if len(exprs1) != len(exprs2) { - return false - } - if len(fields1) != len(fields2) { - return false - } - match := func(reqs1, reqs2 []v1.NodeSelectorRequirement) bool { - for _, req1 := range reqs1 { - reqMatched := false - for _, req2 := range reqs2 { - if nodeSelectorRequirementsEqual(req1, req2) { - reqMatched = true - break - } - } - if !reqMatched { - return false - } - } - return true - } - return match(exprs1, exprs2) && match(exprs2, exprs1) && match(fields1, fields2) && match(fields2, fields1) -} - -// volumeNodeAffinitiesEqual performs a highly semantic comparison of two VolumeNodeAffinity data structures -// It ignores ordering of instances of NodeSelectorRequirements in a VolumeNodeAffinity's NodeSelectorTerms as well as -// orderding of strings in Values of NodeSelectorRequirements when matching two VolumeNodeAffinity structures. -// Note that in most equality functions, Go considers two slices to be not equal if the order of elements in a slice do not -// match - so reflect.DeepEqual as well as Semantic.DeepEqual do not work for comparing VolumeNodeAffinity semantically. -// e.g. these two NodeSelectorTerms are considered semantically equal by volumeNodeAffinitiesEqual -// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{a In [1]} {b In [2 3]}] []}],},} -// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{b In [3 2]} {a In [1]}] []}],},} -// TODO: move volumeNodeAffinitiesEqual to utils so other can use it too -func volumeNodeAffinitiesEqual(n1, n2 *v1.VolumeNodeAffinity) bool { - if (n1 == nil) != (n2 == nil) { - return false - } - if n1 == nil || n2 == nil { - return true - } - ns1 := n1.Required - ns2 := n2.Required - - if (ns1 == nil) != (ns2 == nil) { - return false - } - if (ns1 == nil) && (ns2 == nil) { - return true - } - if len(ns1.NodeSelectorTerms) != len(ns1.NodeSelectorTerms) { - return false - } - match := func(terms1, terms2 []v1.NodeSelectorTerm) bool { - for _, term1 := range terms1 { - termMatched := false - for _, term2 := range terms2 { - if nodeSelectorTermsEqual(term1, term2) { - termMatched = true - break - } - } - if !termMatched { - return false - } - } - return true - } - return match(ns1.NodeSelectorTerms, ns2.NodeSelectorTerms) && match(ns2.NodeSelectorTerms, ns1.NodeSelectorTerms) -} - -func TestCreatePatch(t *testing.T) { - ignoredPV := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "noncloud", - Initializers: &metav1.Initializers{ - Pending: []metav1.Initializer{ - { - Name: initializerName, - }, - }, - }, - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/", - }, - }, - }, - } - awsPV := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - Initializers: &metav1.Initializers{ - Pending: []metav1.Initializer{ - { - Name: initializerName, - }, - }, - }, - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - }, - } - expectedAffinitya1b2MergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - }, - }, - } - expectedAffinityZone1MergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - }, - }, - } - expectedAffinityZonesMergedWithAWSPV := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1", "2", "3"}, - }, - }, - }, - }, - }, - } - awsPVWithAffinity := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - Initializers: &metav1.Initializers{ - Pending: []metav1.Initializer{ - { - Name: initializerName, - }, - }, - }, - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - NodeAffinity: &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - }, - }, - }, - }, - }, - }, - } - expectedAffinitya1b2MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - { - Key: "a", - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - { - Key: "b", - Operator: v1.NodeSelectorOpIn, - Values: []string{"2"}, - }, - }, - }, - }, - }, - } - expectedAffinityZone1MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val4", "val5"}, - }, - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1"}, - }, - }, - }, - }, - }, - } - expectedAffinityZonesMergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "c", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val1", "val2"}, - }, - { - Key: "d", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val3"}, - }, - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"1", "2", "3"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "e", - Operator: v1.NodeSelectorOpIn, - Values: []string{"val5", "val4"}, - }, - { - Key: kubeletapis.LabelZoneFailureDomain, - Operator: v1.NodeSelectorOpIn, - Values: []string{"3", "2", "1"}, - }, - }, - }, - }, - }, - } - - zones, _ := volumeutil.ZonesToSet("1,2,3") - testCases := map[string]struct { - vol v1.PersistentVolume - labels map[string]string - expectedAffinity *v1.VolumeNodeAffinity - }{ - "non-cloud PV": { - vol: ignoredPV, - labels: nil, - expectedAffinity: nil, - }, - "no labels": { - vol: awsPV, - labels: nil, - expectedAffinity: nil, - }, - "cloudprovider returns nil, nil": { - vol: awsPV, - labels: nil, - expectedAffinity: nil, - }, - "cloudprovider labels": { - vol: awsPV, - labels: map[string]string{"a": "1", "b": "2"}, - expectedAffinity: &expectedAffinitya1b2MergedWithAWSPV, - }, - "cloudprovider labels pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{"a": "1", "b": "2"}, - expectedAffinity: &expectedAffinitya1b2MergedWithAWSPVWithAffinity, - }, - "cloudprovider labels pre-existing affinity conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{"a": "1", "c": "2"}, - expectedAffinity: nil, - }, - "cloudprovider singlezone": { - vol: awsPV, - labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"}, - expectedAffinity: &expectedAffinityZone1MergedWithAWSPV, - }, - "cloudprovider singlezone pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"}, - expectedAffinity: &expectedAffinityZone1MergedWithAWSPVWithAffinity, - }, - "cloudprovider multizone": { - vol: awsPV, - labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)}, - expectedAffinity: &expectedAffinityZonesMergedWithAWSPV, - }, - "cloudprovider multizone pre-existing affinity non-conflicting": { - vol: awsPVWithAffinity, - labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)}, - expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity, - }, - } - - defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() - for d, tc := range testCases { - cloud := &fakecloud.FakeCloud{} - client := fake.NewSimpleClientset() - pvlController := NewPersistentVolumeLabelController(client, cloud) - patch, err := pvlController.createPatch(&tc.vol, tc.labels) - if err != nil { - t.Errorf("%s: createPatch returned err: %v", d, err) - } - obj := &v1.PersistentVolume{} - json.Unmarshal(patch, obj) - if obj.ObjectMeta.Initializers != nil { - t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers) - } - if tc.labels == nil { - continue - } - for k, v := range tc.labels { - if obj.ObjectMeta.Labels[k] != v { - t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k]) - } - } - if !volumeNodeAffinitiesEqual(tc.expectedAffinity, obj.Spec.NodeAffinity) { - t.Errorf("Expected affinity %v does not match target affinity %v", tc.expectedAffinity, obj.Spec.NodeAffinity) - } - } -} - -func TestAddLabelsToVolume(t *testing.T) { - pv := v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "awsPV", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ - VolumeID: "123", - }, - }, - }, - } - - testCases := map[string]struct { - vol v1.PersistentVolume - initializers *metav1.Initializers - shouldLabelAndSetAffinity bool - }{ - "PV without initializer": { - vol: pv, - initializers: nil, - shouldLabelAndSetAffinity: false, - }, - "PV with initializer to remove": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}}, - shouldLabelAndSetAffinity: true, - }, - "PV with other initializers only": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}}, - shouldLabelAndSetAffinity: false, - }, - "PV with other initializers first": { - vol: pv, - initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}}, - shouldLabelAndSetAffinity: false, - }, - } - - defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() - - for d, tc := range testCases { - labeledCh := make(chan bool, 1) - client := fake.NewSimpleClientset() - client.PrependReactor("patch", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) { - patch := action.(core.PatchActionImpl).GetPatch() - obj := &v1.PersistentVolume{} - json.Unmarshal(patch, obj) - if obj.ObjectMeta.Labels["a"] != "1" { - return false, nil, nil - } - if obj.Spec.NodeAffinity == nil { - return false, nil, nil - } - if obj.Spec.NodeAffinity.Required == nil { - return false, nil, nil - } - if len(obj.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { - return false, nil, nil - } - reqs := obj.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions - if len(reqs) != 1 { - return false, nil, nil - } - if reqs[0].Key != "a" || reqs[0].Values[0] != "1" || reqs[0].Operator != v1.NodeSelectorOpIn { - return false, nil, nil - } - labeledCh <- true - return true, nil, nil - }) - - fakeCloud := &fakecloud.FakeCloud{ - VolumeLabelMap: map[string]map[string]string{"awsPV": {"a": "1"}}, - } - pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud} - tc.vol.ObjectMeta.Initializers = tc.initializers - pvlController.addLabelsAndAffinityToVolume(&tc.vol) - - select { - case l := <-labeledCh: - if l != tc.shouldLabelAndSetAffinity { - t.Errorf("%s: label and affinity setting of pv failed. expected %t got %t", d, tc.shouldLabelAndSetAffinity, l) - } - case <-time.After(500 * time.Millisecond): - if tc.shouldLabelAndSetAffinity != false { - t.Errorf("%s: timed out waiting for label and affinity setting notification", d) - } - } - } -}