Delete cloud controller

k3s-v1.13.4
Darren Shepherd 2018-10-05 17:55:21 -07:00
parent 020aa7c9da
commit 72d487a7f3
7 changed files with 0 additions and 2830 deletions

View File

@ -1,96 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"node_controller.go",
"pvlcontroller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/cloud",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"main_test.go",
"node_controller_test.go",
"pvlcontroller_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,12 +0,0 @@
approvers:
- thockin
- luxas
- wlan0
- andrewsykim
reviewers:
- thockin
- luxas
- wlan0
- andrewsykim
labels:
- sig/cloud-provider

View File

@ -1,29 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
_ "k8s.io/kubernetes/pkg/features"
)
func TestMain(m *testing.M) {
utilfeaturetesting.VerifyFeatureGatesUnchanged(utilfeature.DefaultFeatureGate, m.Run)
}

View File

@ -1,541 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"errors"
"fmt"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
cloudprovider "k8s.io/cloud-provider"
nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
nodectrlutil "k8s.io/kubernetes/pkg/controller/util/node"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
var UpdateNodeSpecBackoff = wait.Backoff{
Steps: 20,
Duration: 50 * time.Millisecond,
Jitter: 1.0,
}
type CloudNodeController struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
recorder record.EventRecorder
cloud cloudprovider.Interface
// Value controlling NodeController monitoring period, i.e. how often does NodeController
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration
nodeStatusUpdateFrequency time.Duration
}
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
retrySleepTime = 20 * time.Millisecond
)
// NewCloudNodeController creates a CloudNodeController object
func NewCloudNodeController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration,
nodeStatusUpdateFrequency time.Duration) *CloudNodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
eventBroadcaster.StartLogging(klog.Infof)
if kubeClient != nil {
klog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
} else {
klog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
cnc := &CloudNodeController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
}
// Use shared informer to listen to add/update of nodes. Note that any nodes
// that exist before node controller starts will show up in the update method
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cnc.AddCloudNode,
UpdateFunc: cnc.UpdateCloudNode,
})
return cnc
}
// This controller deletes a node if kubelet is not reporting
// and the node is gone from the cloud provider.
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// The following loops run communicate with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
// Start a loop to periodically update the node addresses obtained from the cloud
go wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh)
// Start a loop to periodically check if any nodes have been deleted from cloudprovider
go wait.Until(cnc.MonitorNode, cnc.nodeMonitorPeriod, stopCh)
}
// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus() {
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return
}
for i := range nodes.Items {
cnc.updateNodeAddress(&nodes.Items[i], instances)
}
}
// UpdateNodeAddress updates the nodeAddress of a single node
func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) {
// Do not process nodes that are still tainted
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil {
klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
return
}
// Node that isn't present according to the cloud provider shouldn't have its address updated
exists, err := ensureNodeExistsByProviderID(instances, node)
if err != nil {
// Continue to update node address when not sure the node is not exists
klog.Errorf("%v", err)
} else if !exists {
klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name)
return
}
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node)
if err != nil {
klog.Errorf("%v", err)
return
}
if len(nodeAddresses) == 0 {
klog.V(5).Infof("Skipping node address update for node %q since cloud provider did not return any", node.Name)
return
}
// Check if a hostname address exists in the cloud provided addresses
hostnameExists := false
for i := range nodeAddresses {
if nodeAddresses[i].Type == v1.NodeHostName {
hostnameExists = true
}
}
// If hostname was not present in cloud provided addresses, use the hostname
// from the existing node (populated by kubelet)
if !hostnameExists {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeHostName {
nodeAddresses = append(nodeAddresses, addr)
}
}
}
// If nodeIP was suggested by user, ensure that
// it can be found in the cloud as well (consistent with the behaviour in kubelet)
if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok {
if nodeIP == nil {
klog.Errorf("Specified Node IP not found in cloudprovider")
return
}
}
newNode := node.DeepCopy()
newNode.Status.Addresses = nodeAddresses
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return
}
_, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil {
klog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
}
}
// Monitor node queries the cloudprovider for non-ready nodes and deletes them
// if they cannot be found in the cloud provider
func (cnc *CloudNodeController) MonitorNode() {
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return
}
for i := range nodes.Items {
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
// Try to get the current node status
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
_, currentReadyCondition = nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition != nil {
break
}
name := node.Name
node, err = cnc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
break
}
time.Sleep(retrySleepTime)
}
if currentReadyCondition == nil {
klog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count or the Node was deleted.", node.Name)
continue
}
// If the known node status says that Node is NotReady, then check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
if currentReadyCondition != nil {
if currentReadyCondition.Status != v1.ConditionTrue {
// we need to check this first to get taint working in similar in all cloudproviders
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
shutdown, err := nodectrlutil.ShutdownInCloudProvider(context.TODO(), cnc.cloud, node)
if err != nil {
klog.Errorf("Error checking if node %s is shutdown: %v", node.Name, err)
}
if shutdown && err == nil {
// if node is shutdown add shutdown taint
err = controller.AddOrUpdateTaintOnNode(cnc.kubeClient, node.Name, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
// Continue checking the remaining nodes since the current one is shutdown.
continue
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
exists, err := ensureNodeExistsByProviderID(instances, node)
if err != nil {
klog.Errorf("Error checking if node %s exists: %v", node.Name, err)
continue
}
if exists {
// Continue checking the remaining nodes since the current one is fine.
continue
}
klog.V(2).Infof("Deleting node since it is no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
klog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
go func(nodeName string) {
defer utilruntime.HandleCrash()
if err := cnc.kubeClient.CoreV1().Nodes().Delete(nodeName, nil); err != nil {
klog.Errorf("unable to delete node %q: %v", nodeName, err)
}
}(node.Name)
} else {
// if taint exist remove taint
err = controller.RemoveTaintOffNode(cnc.kubeClient, node.Name, node, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
}
}
}
}
func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) {
if _, ok := newObj.(*v1.Node); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
cnc.AddCloudNode(newObj)
}
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate
func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
node := obj.(*v1.Node)
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint == nil {
klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
return
}
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
// Since there are node taints, do we still need this?
// This condition marks the node as unusable until routes are initialized in the cloud provider
if cnc.cloud.ProviderName() == "gce" {
if err := nodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
Reason: "NoRouteCreated",
Message: "Node created without a route",
LastTransitionTime: metav1.Now(),
}); err != nil {
return err
}
}
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return err
}
if curNode.Spec.ProviderID == "" {
providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name))
if err == nil {
curNode.Spec.ProviderID = providerID
} else {
// we should attempt to set providerID on curNode, but
// we can continue if we fail since we will attempt to set
// node addresses given the node name in getNodeAddressesByProviderIDOrName
klog.Errorf("failed to set node provider id: %v", err)
}
}
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode)
if err != nil {
return err
}
// If user provided an IP address, ensure that IP address is found
// in the cloud provider before removing the taint on the node
if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok {
if nodeIP == nil {
return errors.New("failed to find kubelet node IP from cloud provider")
}
}
if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil {
return err
} else if instanceType != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelInstanceType, instanceType)
curNode.ObjectMeta.Labels[kubeletapis.LabelInstanceType] = instanceType
}
if zones, ok := cnc.cloud.Zones(); ok {
zone, err := getZoneByProviderIDOrName(zones, curNode)
if err != nil {
return fmt.Errorf("failed to get zone from cloud provider: %v", err)
}
if zone.FailureDomain != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneFailureDomain, zone.FailureDomain)
curNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] = zone.FailureDomain
}
if zone.Region != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneRegion, zone.Region)
curNode.ObjectMeta.Labels[kubeletapis.LabelZoneRegion] = zone.Region
}
}
curNode.Spec.Taints = excludeTaintFromList(curNode.Spec.Taints, *cloudTaint)
_, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode)
if err != nil {
return err
}
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(curNode, instances)
return nil
})
if err != nil {
utilruntime.HandleError(err)
return
}
klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
}
func getCloudTaint(taints []v1.Taint) *v1.Taint {
for _, taint := range taints {
if taint.Key == schedulerapi.TaintExternalCloudProvider {
return &taint
}
}
return nil
}
func excludeTaintFromList(taints []v1.Taint, toExclude v1.Taint) []v1.Taint {
newTaints := []v1.Taint{}
for _, taint := range taints {
if toExclude.MatchTaint(&taint) {
continue
}
newTaints = append(newTaints, taint)
}
return newTaints
}
// ensureNodeExistsByProviderID checks if the instance exists by the provider id,
// If provider id in spec is empty it calls instanceId with node name to get provider id
func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) {
providerID := node.Spec.ProviderID
if providerID == "" {
var err error
providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name))
if err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
if providerID == "" {
klog.Warningf("Cannot find valid providerID for node name %q, assuming non existence", node.Name)
return false, nil
}
}
return instances.InstanceExistsByProviderID(context.TODO(), providerID)
}
func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name))
if err != nil {
return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
}
return nodeAddresses, nil
}
func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool {
if len(addressSet1) != len(addressSet2) {
return true
}
addressMap1 := map[v1.NodeAddressType]string{}
addressMap2 := map[v1.NodeAddressType]string{}
for i := range addressSet1 {
addressMap1[addressSet1[i].Type] = addressSet1[i].Address
addressMap2[addressSet2[i].Type] = addressSet2[i].Address
}
for k, v := range addressMap1 {
if addressMap2[k] != v {
return true
}
}
return false
}
func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (*v1.NodeAddress, bool) {
var nodeIP *v1.NodeAddress
nodeIPExists := false
if providedIP, ok := node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr]; ok {
nodeIPExists = true
for i := range nodeAddresses {
if nodeAddresses[i].Address == providedIP {
nodeIP = &nodeAddresses[i]
break
}
}
}
return nodeIP, nodeIPExists
}
func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) {
instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name))
if err != nil {
return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
}
return instanceType, err
}
// getZoneByProviderIDorName will attempt to get the zone of node using its providerID
// then it's name. If both attempts fail, an error is returned
func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) {
zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name))
if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
}
return zone, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -1,328 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"encoding/json"
"fmt"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller"
)
const initializerName = "pvlabel.kubernetes.io"
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
type PersistentVolumeLabelController struct {
cloud cloudprovider.Interface
kubeClient kubernetes.Interface
pvlController cache.Controller
pvlIndexer cache.Indexer
volumeLister corelisters.PersistentVolumeLister
syncHandler func(key string) error
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object
func NewPersistentVolumeLabelController(
kubeClient kubernetes.Interface,
cloud cloudprovider.Interface) *PersistentVolumeLabelController {
pvlc := &PersistentVolumeLabelController{
cloud: cloud,
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"),
}
pvlc.syncHandler = pvlc.addLabelsAndAffinity
pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.IncludeUninitialized = true
return kubeClient.CoreV1().PersistentVolumes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.IncludeUninitialized = true
return kubeClient.CoreV1().PersistentVolumes().Watch(options)
},
},
&v1.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
pvlc.queue.Add(key)
}
},
},
cache.Indexers{},
)
pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer)
return pvlc
}
// Run starts a controller that adds labels to persistent volumes
func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer pvlc.queue.ShutDown()
klog.Infof("Starting PersistentVolumeLabelController")
defer klog.Infof("Shutting down PersistentVolumeLabelController")
go pvlc.pvlController.Run(stopCh)
if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) {
return
}
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
// after one second
go wait.Until(pvlc.runWorker, time.Second, stopCh)
}
// wait until we're told to stop
<-stopCh
}
func (pvlc *PersistentVolumeLabelController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits
for pvlc.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup something in a cache
keyObj, quit := pvlc.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of work
defer pvlc.queue.Done(keyObj)
key := keyObj.(string)
// do your work on the key. This method will contains your "do stuff" logic
err := pvlc.syncHandler(key)
if err == nil {
// if you had no error, tell the queue to stop tracking history for your key. This will
// reset things like failure counts for per-item rate limiting
pvlc.queue.Forget(key)
return true
}
// there was a failure so be sure to report it. This method allows for pluggable error handling
// which can be used for things like cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
// since we failed, we should requeue the item to work on later. This method will add a backoff
// to avoid hotlooping on particular items (they're probably still not going to work right away)
// and overall controller protection (everything I've done is broken, this controller needs to
// calm down or it can starve other useful work) cases.
pvlc.queue.AddRateLimited(key)
return true
}
// AddLabels adds appropriate labels to persistent volumes and sets the
// volume as available if successful.
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err)
}
volume, err := pvlc.volumeLister.Get(name)
if errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("error getting volume %s from informer: %v", name, err)
}
return pvlc.addLabelsAndAffinityToVolume(volume)
}
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error {
var volumeLabels map[string]string
// Only add labels if the next pending initializer.
if needsInitialization(vol.Initializers, initializerName) {
if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok {
labels, err := labeler.GetLabelsForVolume(context.TODO(), vol)
if err != nil {
return fmt.Errorf("error querying volume %v: %v", vol.Spec, err)
}
volumeLabels = labels
} else {
klog.V(4).Info("cloud provider does not support PVLabeler")
}
return pvlc.updateVolume(vol, volumeLabels)
}
return nil
}
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
volName := vol.Name
newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)
populateAffinity := utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) && len(volLabels) != 0
if newVolume.Labels == nil {
newVolume.Labels = make(map[string]string)
}
requirements := make([]v1.NodeSelectorRequirement, 0)
for k, v := range volLabels {
newVolume.Labels[k] = v
// Set NodeSelectorRequirements based on the labels
if populateAffinity {
var values []string
if k == kubeletapis.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
if err != nil {
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)
}
values = zones.List()
} else {
values = []string{v}
}
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
}
}
if populateAffinity {
if newVolume.Spec.NodeAffinity == nil {
newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
}
if newVolume.Spec.NodeAffinity.Required == nil {
newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector)
}
if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
// Need at least one term pre-allocated whose MatchExpressions can be appended to
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
}
// Populate NodeAffinity with requirements if there are no conflicting keys found
if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
requirements, newVolume.Spec.NodeAffinity)
} else {
for _, req := range requirements {
for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms {
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
}
}
}
}
newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName)
klog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name)
oldData, err := json.Marshal(vol)
if err != nil {
return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err)
}
newData, err := json.Marshal(newVolume)
if err != nil {
return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err)
}
patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{})
if err != nil {
return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err)
}
return patch, nil
}
func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error {
volName := vol.Name
klog.V(4).Infof("updating PersistentVolume %s", volName)
patchBytes, err := pvlc.createPatch(vol, volLabels)
if err != nil {
return err
}
_, err = pvlc.kubeClient.CoreV1().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err)
}
klog.V(4).Infof("updated PersistentVolume %s", volName)
return nil
}
func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers {
if initializers == nil {
return nil
}
var updated []metav1.Initializer
for _, pending := range initializers.Pending {
if pending.Name != name {
updated = append(updated, pending)
}
}
if len(updated) == len(initializers.Pending) {
return initializers
}
if len(updated) == 0 {
return nil
}
return &metav1.Initializers{Pending: updated}
}
// needsInitialization checks whether or not the PVL is the next pending initializer.
func needsInitialization(initializers *metav1.Initializers, name string) bool {
if initializers == nil {
return false
}
if len(initializers.Pending) == 0 {
return false
}
// There is at least one initializer still pending so check to
// see if the PVL is the next in line.
return initializers.Pending[0].Name == name
}

View File

@ -1,573 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"encoding/json"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
sets "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
)
func nodeSelectorRequirementsEqual(r1, r2 v1.NodeSelectorRequirement) bool {
if r1.Key != r2.Key {
return false
}
if r1.Operator != r2.Operator {
return false
}
vals1 := sets.NewString(r1.Values...)
vals2 := sets.NewString(r2.Values...)
if vals1.Equal(vals2) {
return true
}
return false
}
func nodeSelectorTermsEqual(t1, t2 v1.NodeSelectorTerm) bool {
exprs1 := t1.MatchExpressions
exprs2 := t2.MatchExpressions
fields1 := t1.MatchFields
fields2 := t2.MatchFields
if len(exprs1) != len(exprs2) {
return false
}
if len(fields1) != len(fields2) {
return false
}
match := func(reqs1, reqs2 []v1.NodeSelectorRequirement) bool {
for _, req1 := range reqs1 {
reqMatched := false
for _, req2 := range reqs2 {
if nodeSelectorRequirementsEqual(req1, req2) {
reqMatched = true
break
}
}
if !reqMatched {
return false
}
}
return true
}
return match(exprs1, exprs2) && match(exprs2, exprs1) && match(fields1, fields2) && match(fields2, fields1)
}
// volumeNodeAffinitiesEqual performs a highly semantic comparison of two VolumeNodeAffinity data structures
// It ignores ordering of instances of NodeSelectorRequirements in a VolumeNodeAffinity's NodeSelectorTerms as well as
// orderding of strings in Values of NodeSelectorRequirements when matching two VolumeNodeAffinity structures.
// Note that in most equality functions, Go considers two slices to be not equal if the order of elements in a slice do not
// match - so reflect.DeepEqual as well as Semantic.DeepEqual do not work for comparing VolumeNodeAffinity semantically.
// e.g. these two NodeSelectorTerms are considered semantically equal by volumeNodeAffinitiesEqual
// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{a In [1]} {b In [2 3]}] []}],},}
// &VolumeNodeAffinity{Required:&NodeSelector{NodeSelectorTerms:[{[{b In [3 2]} {a In [1]}] []}],},}
// TODO: move volumeNodeAffinitiesEqual to utils so other can use it too
func volumeNodeAffinitiesEqual(n1, n2 *v1.VolumeNodeAffinity) bool {
if (n1 == nil) != (n2 == nil) {
return false
}
if n1 == nil || n2 == nil {
return true
}
ns1 := n1.Required
ns2 := n2.Required
if (ns1 == nil) != (ns2 == nil) {
return false
}
if (ns1 == nil) && (ns2 == nil) {
return true
}
if len(ns1.NodeSelectorTerms) != len(ns1.NodeSelectorTerms) {
return false
}
match := func(terms1, terms2 []v1.NodeSelectorTerm) bool {
for _, term1 := range terms1 {
termMatched := false
for _, term2 := range terms2 {
if nodeSelectorTermsEqual(term1, term2) {
termMatched = true
break
}
}
if !termMatched {
return false
}
}
return true
}
return match(ns1.NodeSelectorTerms, ns2.NodeSelectorTerms) && match(ns2.NodeSelectorTerms, ns1.NodeSelectorTerms)
}
func TestCreatePatch(t *testing.T) {
ignoredPV := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "noncloud",
Initializers: &metav1.Initializers{
Pending: []metav1.Initializer{
{
Name: initializerName,
},
},
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/",
},
},
},
}
awsPV := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "awsPV",
Initializers: &metav1.Initializers{
Pending: []metav1.Initializer{
{
Name: initializerName,
},
},
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "123",
},
},
},
}
expectedAffinitya1b2MergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
},
},
}
expectedAffinityZone1MergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
},
},
}
expectedAffinityZonesMergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1", "2", "3"},
},
},
},
},
},
}
awsPVWithAffinity := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "awsPV",
Initializers: &metav1.Initializers{
Pending: []metav1.Initializer{
{
Name: initializerName,
},
},
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "123",
},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
},
},
},
},
},
},
}
expectedAffinitya1b2MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
},
},
}
expectedAffinityZone1MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
},
},
}
expectedAffinityZonesMergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1", "2", "3"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val5", "val4"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"3", "2", "1"},
},
},
},
},
},
}
zones, _ := volumeutil.ZonesToSet("1,2,3")
testCases := map[string]struct {
vol v1.PersistentVolume
labels map[string]string
expectedAffinity *v1.VolumeNodeAffinity
}{
"non-cloud PV": {
vol: ignoredPV,
labels: nil,
expectedAffinity: nil,
},
"no labels": {
vol: awsPV,
labels: nil,
expectedAffinity: nil,
},
"cloudprovider returns nil, nil": {
vol: awsPV,
labels: nil,
expectedAffinity: nil,
},
"cloudprovider labels": {
vol: awsPV,
labels: map[string]string{"a": "1", "b": "2"},
expectedAffinity: &expectedAffinitya1b2MergedWithAWSPV,
},
"cloudprovider labels pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{"a": "1", "b": "2"},
expectedAffinity: &expectedAffinitya1b2MergedWithAWSPVWithAffinity,
},
"cloudprovider labels pre-existing affinity conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{"a": "1", "c": "2"},
expectedAffinity: nil,
},
"cloudprovider singlezone": {
vol: awsPV,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"},
expectedAffinity: &expectedAffinityZone1MergedWithAWSPV,
},
"cloudprovider singlezone pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"},
expectedAffinity: &expectedAffinityZone1MergedWithAWSPVWithAffinity,
},
"cloudprovider multizone": {
vol: awsPV,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPV,
},
"cloudprovider multizone pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity,
},
}
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
for d, tc := range testCases {
cloud := &fakecloud.FakeCloud{}
client := fake.NewSimpleClientset()
pvlController := NewPersistentVolumeLabelController(client, cloud)
patch, err := pvlController.createPatch(&tc.vol, tc.labels)
if err != nil {
t.Errorf("%s: createPatch returned err: %v", d, err)
}
obj := &v1.PersistentVolume{}
json.Unmarshal(patch, obj)
if obj.ObjectMeta.Initializers != nil {
t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers)
}
if tc.labels == nil {
continue
}
for k, v := range tc.labels {
if obj.ObjectMeta.Labels[k] != v {
t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k])
}
}
if !volumeNodeAffinitiesEqual(tc.expectedAffinity, obj.Spec.NodeAffinity) {
t.Errorf("Expected affinity %v does not match target affinity %v", tc.expectedAffinity, obj.Spec.NodeAffinity)
}
}
}
func TestAddLabelsToVolume(t *testing.T) {
pv := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "awsPV",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "123",
},
},
},
}
testCases := map[string]struct {
vol v1.PersistentVolume
initializers *metav1.Initializers
shouldLabelAndSetAffinity bool
}{
"PV without initializer": {
vol: pv,
initializers: nil,
shouldLabelAndSetAffinity: false,
},
"PV with initializer to remove": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}},
shouldLabelAndSetAffinity: true,
},
"PV with other initializers only": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}},
shouldLabelAndSetAffinity: false,
},
"PV with other initializers first": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}},
shouldLabelAndSetAffinity: false,
},
}
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
for d, tc := range testCases {
labeledCh := make(chan bool, 1)
client := fake.NewSimpleClientset()
client.PrependReactor("patch", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
patch := action.(core.PatchActionImpl).GetPatch()
obj := &v1.PersistentVolume{}
json.Unmarshal(patch, obj)
if obj.ObjectMeta.Labels["a"] != "1" {
return false, nil, nil
}
if obj.Spec.NodeAffinity == nil {
return false, nil, nil
}
if obj.Spec.NodeAffinity.Required == nil {
return false, nil, nil
}
if len(obj.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return false, nil, nil
}
reqs := obj.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions
if len(reqs) != 1 {
return false, nil, nil
}
if reqs[0].Key != "a" || reqs[0].Values[0] != "1" || reqs[0].Operator != v1.NodeSelectorOpIn {
return false, nil, nil
}
labeledCh <- true
return true, nil, nil
})
fakeCloud := &fakecloud.FakeCloud{
VolumeLabelMap: map[string]map[string]string{"awsPV": {"a": "1"}},
}
pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud}
tc.vol.ObjectMeta.Initializers = tc.initializers
pvlController.addLabelsAndAffinityToVolume(&tc.vol)
select {
case l := <-labeledCh:
if l != tc.shouldLabelAndSetAffinity {
t.Errorf("%s: label and affinity setting of pv failed. expected %t got %t", d, tc.shouldLabelAndSetAffinity, l)
}
case <-time.After(500 * time.Millisecond):
if tc.shouldLabelAndSetAffinity != false {
t.Errorf("%s: timed out waiting for label and affinity setting notification", d)
}
}
}
}