mirror of https://github.com/k3s-io/k3s
Merge pull request #65730 from ddebroy/ebs-affinity1
Automatic merge from submit-queue (batch tested with PRs 65730, 66615, 66684, 66519, 66510). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add DynamicProvisioningScheduling support for EBS **What this PR does / why we need it**: This PR adds support for the DynamicProvisioningScheduling feature in EBS. With this in place, if VolumeBindingMode: WaitForFirstConsumer is specified in a EBS storageclass and DynamicProvisioningScheduling is enabled, EBS provisioner will use the selected node's LabelZoneFailureDomain as the zone to provision the EBS volume in. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes # **Special notes for your reviewer**: Related to #63232 Sample `describe pv` output with NodeAffinity populated: ``` ~$ kubectl describe pv pvc-f9d2138b-7e3e-11e8-a4ea-064124617820 Name: pvc-f9d2138b-7e3e-11e8-a4ea-064124617820 Labels: failure-domain.beta.kubernetes.io/region=us-west-2 failure-domain.beta.kubernetes.io/zone=us-west-2a Annotations: kubernetes.io/createdby=aws-ebs-dynamic-provisioner pv.kubernetes.io/bound-by-controller=yes pv.kubernetes.io/provisioned-by=kubernetes.io/aws-ebs Finalizers: [kubernetes.io/pv-protection] StorageClass: slow3 Status: Bound Claim: default/pvc3 Reclaim Policy: Delete Access Modes: RWO Capacity: 6Gi Node Affinity: Required Terms: Term 0: failure-domain.beta.kubernetes.io/zone in [us-west-2a] failure-domain.beta.kubernetes.io/region in [us-west-2] Message: Source: Type: AWSElasticBlockStore (a Persistent Disk resource in AWS) VolumeID: aws://us-west-2a/vol-0fc1cdae7d10860f6 FSType: ext4 Partition: 0 ReadOnly: false Events: <none> ``` **Release note**: ```release-note none ``` /sig storage /assign @msau42 @jsafranepull/8/head
commit
22d0ef2a8e
|
@ -413,14 +413,10 @@ const (
|
|||
|
||||
// VolumeOptions specifies capacity and tags for a volume.
|
||||
type VolumeOptions struct {
|
||||
CapacityGB int
|
||||
Tags map[string]string
|
||||
PVCName string
|
||||
VolumeType string
|
||||
ZonePresent bool
|
||||
ZonesPresent bool
|
||||
AvailabilityZone string
|
||||
AvailabilityZones string
|
||||
CapacityGB int
|
||||
Tags map[string]string
|
||||
VolumeType string
|
||||
AvailabilityZone string
|
||||
// IOPSPerGB x CapacityGB will give total IOPS of the volume to create.
|
||||
// Calculated total IOPS will be capped at MaxTotalIOPS.
|
||||
IOPSPerGB int
|
||||
|
@ -1431,9 +1427,9 @@ func (c *Cloud) InstanceType(ctx context.Context, nodeName types.NodeName) (stri
|
|||
return aws.StringValue(inst.InstanceType), nil
|
||||
}
|
||||
|
||||
// getCandidateZonesForDynamicVolume retrieves a list of all the zones in which nodes are running
|
||||
// GetCandidateZonesForDynamicVolume retrieves a list of all the zones in which nodes are running
|
||||
// It currently involves querying all instances
|
||||
func (c *Cloud) getCandidateZonesForDynamicVolume() (sets.String, error) {
|
||||
func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) {
|
||||
// We don't currently cache this; it is currently used only in volume
|
||||
// creation which is expected to be a comparatively rare occurrence.
|
||||
|
||||
|
@ -2159,29 +2155,6 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
|
|||
|
||||
// CreateDisk implements Volumes.CreateDisk
|
||||
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, error) {
|
||||
var createAZ string
|
||||
if !volumeOptions.ZonePresent && !volumeOptions.ZonesPresent {
|
||||
// querry for candidate zones only if zone parameters absent
|
||||
allZones, err := c.getCandidateZonesForDynamicVolume()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error querying for all zones: %v", err)
|
||||
}
|
||||
createAZ = volumeutil.ChooseZoneForVolume(allZones, volumeOptions.PVCName)
|
||||
}
|
||||
if !volumeOptions.ZonePresent && volumeOptions.ZonesPresent {
|
||||
if adminSetOfZones, err := volumeutil.ZonesToSet(volumeOptions.AvailabilityZones); err != nil {
|
||||
return "", err
|
||||
} else {
|
||||
createAZ = volumeutil.ChooseZoneForVolume(adminSetOfZones, volumeOptions.PVCName)
|
||||
}
|
||||
}
|
||||
if volumeOptions.ZonePresent && !volumeOptions.ZonesPresent {
|
||||
if err := volumeutil.ValidateZone(volumeOptions.AvailabilityZone); err != nil {
|
||||
return "", err
|
||||
}
|
||||
createAZ = volumeOptions.AvailabilityZone
|
||||
}
|
||||
|
||||
var createType string
|
||||
var iops int64
|
||||
switch volumeOptions.VolumeType {
|
||||
|
@ -2213,7 +2186,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er
|
|||
|
||||
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
|
||||
request := &ec2.CreateVolumeInput{}
|
||||
request.AvailabilityZone = aws.String(createAZ)
|
||||
request.AvailabilityZone = aws.String(volumeOptions.AvailabilityZone)
|
||||
request.Size = aws.Int64(int64(volumeOptions.CapacityGB))
|
||||
request.VolumeType = aws.String(createType)
|
||||
request.Encrypted = aws.Bool(volumeOptions.Encrypted)
|
||||
|
|
|
@ -29,6 +29,7 @@ go_library(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
|
|
|
@ -314,7 +314,7 @@ var _ volume.VolumePluginWithAttachLimits = &awsElasticBlockStorePlugin{}
|
|||
|
||||
// Abstract interface to PD operations.
|
||||
type ebsManager interface {
|
||||
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error)
|
||||
CreateVolume(provisioner *awsElasticBlockStoreProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error)
|
||||
// Deletes a volume
|
||||
DeleteVolume(deleter *awsElasticBlockStoreDeleter) error
|
||||
}
|
||||
|
@ -504,7 +504,7 @@ func (c *awsElasticBlockStoreProvisioner) Provision(selectedNode *v1.Node, allow
|
|||
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
|
||||
}
|
||||
|
||||
volumeID, sizeGB, labels, fstype, err := c.manager.CreateVolume(c)
|
||||
volumeID, sizeGB, labels, fstype, err := c.manager.CreateVolume(c, selectedNode, allowedTopologies)
|
||||
if err != nil {
|
||||
glog.Errorf("Provision failed: %v", err)
|
||||
return nil, err
|
||||
|
@ -554,14 +554,23 @@ func (c *awsElasticBlockStoreProvisioner) Provision(selectedNode *v1.Node, allow
|
|||
pv.Spec.AccessModes = c.plugin.GetAccessModes()
|
||||
}
|
||||
|
||||
requirements := make([]v1.NodeSelectorRequirement, 0)
|
||||
if len(labels) != 0 {
|
||||
if pv.Labels == nil {
|
||||
pv.Labels = make(map[string]string)
|
||||
}
|
||||
for k, v := range labels {
|
||||
pv.Labels[k] = v
|
||||
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}})
|
||||
}
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||
pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
|
||||
pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
|
||||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
|
||||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements
|
||||
}
|
||||
|
||||
return pv, nil
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ type fakePDManager struct {
|
|||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
func (fake *fakePDManager) CreateVolume(c *awsElasticBlockStoreProvisioner) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error) {
|
||||
func (fake *fakePDManager) CreateVolume(c *awsElasticBlockStoreProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error) {
|
||||
labels = make(map[string]string)
|
||||
labels["fakepdmanager"] = "yes"
|
||||
return "test-aws-volume-name", 100, labels, "", nil
|
||||
|
@ -173,6 +173,7 @@ func TestPlugin(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Error creating new provisioner:%v", err)
|
||||
}
|
||||
|
||||
persistentSpec, err := provisioner.Provision(nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Provision() failed: %v", err)
|
||||
|
@ -191,6 +192,37 @@ func TestPlugin(t *testing.T) {
|
|||
t.Errorf("Provision() returned unexpected labels: %v", persistentSpec.Labels)
|
||||
}
|
||||
|
||||
// check nodeaffinity members
|
||||
if persistentSpec.Spec.NodeAffinity == nil {
|
||||
t.Errorf("Provision() returned unexpected nil NodeAffinity")
|
||||
}
|
||||
|
||||
if persistentSpec.Spec.NodeAffinity.Required == nil {
|
||||
t.Errorf("Provision() returned unexpected nil NodeAffinity.Required")
|
||||
}
|
||||
|
||||
n := len(persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms)
|
||||
if n != 1 {
|
||||
t.Errorf("Provision() returned unexpected number of NodeSelectorTerms %d. Expected %d", n, 1)
|
||||
}
|
||||
|
||||
n = len(persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions)
|
||||
if n != 1 {
|
||||
t.Errorf("Provision() returned unexpected number of MatchExpressions %d. Expected %d", n, 1)
|
||||
}
|
||||
|
||||
req := persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0]
|
||||
if req.Key != "fakepdmanager" {
|
||||
t.Errorf("Provision() returned unexpected requirement key in NodeAffinity %v", req.Key)
|
||||
}
|
||||
|
||||
if req.Operator != v1.NodeSelectorOpIn {
|
||||
t.Errorf("Provision() returned unexpected requirement operator in NodeAffinity %v", req.Operator)
|
||||
}
|
||||
|
||||
if len(req.Values) != 1 || req.Values[0] != "yes" {
|
||||
t.Errorf("Provision() returned unexpected requirement value in NodeAffinity %v", req.Values)
|
||||
}
|
||||
// Test Deleter
|
||||
volSpec := &volume.Spec{
|
||||
PersistentVolume: persistentSpec,
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
|
@ -40,6 +42,7 @@ const (
|
|||
maxRetries = 10
|
||||
checkSleepDuration = time.Second
|
||||
errorSleepDuration = 5 * time.Second
|
||||
ebsMaxReplicasInAZ = 1
|
||||
)
|
||||
|
||||
type AWSDiskUtil struct{}
|
||||
|
@ -67,7 +70,7 @@ func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
|
|||
|
||||
// CreateVolume creates an AWS EBS volume.
|
||||
// Returns: volumeID, volumeSizeGB, labels, error
|
||||
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (aws.KubernetesVolumeID, int, map[string]string, string, error) {
|
||||
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (aws.KubernetesVolumeID, int, map[string]string, string, error) {
|
||||
cloud, err := getCloudProvider(c.awsElasticBlockStore.plugin.host.GetCloudProvider())
|
||||
if err != nil {
|
||||
return "", 0, nil, "", err
|
||||
|
@ -83,52 +86,16 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (aws.K
|
|||
tags["Name"] = volumeutil.GenerateVolumeName(c.options.ClusterName, c.options.PVName, 255) // AWS tags can have 255 characters
|
||||
|
||||
capacity := c.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
|
||||
// AWS works with gigabytes, convert to GiB with rounding up
|
||||
requestGiB, err := volumeutil.RoundUpToGiBInt(capacity)
|
||||
|
||||
zonesWithNodes, err := cloud.GetCandidateZonesForDynamicVolume()
|
||||
if err != nil {
|
||||
return "", 0, nil, "", err
|
||||
}
|
||||
volumeOptions := &aws.VolumeOptions{
|
||||
CapacityGB: requestGiB,
|
||||
Tags: tags,
|
||||
PVCName: c.options.PVC.Name,
|
||||
}
|
||||
fstype := ""
|
||||
// Apply Parameters (case-insensitive). We leave validation of
|
||||
// the values to the cloud provider.
|
||||
volumeOptions.ZonePresent = false
|
||||
volumeOptions.ZonesPresent = false
|
||||
for k, v := range c.options.Parameters {
|
||||
switch strings.ToLower(k) {
|
||||
case "type":
|
||||
volumeOptions.VolumeType = v
|
||||
case "zone":
|
||||
volumeOptions.ZonePresent = true
|
||||
volumeOptions.AvailabilityZone = v
|
||||
case "zones":
|
||||
volumeOptions.ZonesPresent = true
|
||||
volumeOptions.AvailabilityZones = v
|
||||
case "iopspergb":
|
||||
volumeOptions.IOPSPerGB, err = strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return "", 0, nil, "", fmt.Errorf("invalid iopsPerGB value %q, must be integer between 1 and 30: %v", v, err)
|
||||
}
|
||||
case "encrypted":
|
||||
volumeOptions.Encrypted, err = strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return "", 0, nil, "", fmt.Errorf("invalid encrypted boolean value %q, must be true or false: %v", v, err)
|
||||
}
|
||||
case "kmskeyid":
|
||||
volumeOptions.KmsKeyId = v
|
||||
case volume.VolumeParameterFSType:
|
||||
fstype = v
|
||||
default:
|
||||
return "", 0, nil, "", fmt.Errorf("invalid option %q for volume plugin %s", k, c.plugin.GetPluginName())
|
||||
}
|
||||
return "", 0, nil, "", fmt.Errorf("error querying for all zones: %v", err)
|
||||
}
|
||||
|
||||
if volumeOptions.ZonePresent && volumeOptions.ZonesPresent {
|
||||
return "", 0, nil, "", fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time")
|
||||
volumeOptions, err := populateVolumeOptions(c.plugin.GetPluginName(), c.options.PVC.Name, capacity, tags, c.options.Parameters, node, allowedTopologies, zonesWithNodes)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Error populating EBS options: %v", err)
|
||||
return "", 0, nil, "", err
|
||||
}
|
||||
|
||||
// TODO: implement PVC.Selector parsing
|
||||
|
@ -149,7 +116,67 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (aws.K
|
|||
glog.Errorf("error building labels for new EBS volume %q: %v", name, err)
|
||||
}
|
||||
|
||||
return name, requestGiB, labels, fstype, nil
|
||||
fstype := ""
|
||||
if v, ok := c.options.Parameters[volume.VolumeParameterFSType]; ok {
|
||||
fstype = v
|
||||
}
|
||||
|
||||
return name, volumeOptions.CapacityGB, labels, fstype, nil
|
||||
}
|
||||
|
||||
// returns volumeOptions for EBS based on storageclass parameters and node configuration
|
||||
func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quantity, tags map[string]string, storageParams map[string]string, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, zonesWithNodes sets.String) (*aws.VolumeOptions, error) {
|
||||
requestGiB, err := volumeutil.RoundUpToGiBInt(capacityGB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volumeOptions := &aws.VolumeOptions{
|
||||
CapacityGB: requestGiB,
|
||||
Tags: tags,
|
||||
}
|
||||
|
||||
// Apply Parameters (case-insensitive). We leave validation of
|
||||
// the values to the cloud provider.
|
||||
zonePresent := false
|
||||
zonesPresent := false
|
||||
var zone string
|
||||
var zones sets.String
|
||||
for k, v := range storageParams {
|
||||
switch strings.ToLower(k) {
|
||||
case "type":
|
||||
volumeOptions.VolumeType = v
|
||||
case "zone":
|
||||
zonePresent = true
|
||||
zone = v
|
||||
case "zones":
|
||||
zonesPresent = true
|
||||
zones, err = volumeutil.ZonesToSet(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing zones %s, must be strings separated by commas: %v", zones, err)
|
||||
}
|
||||
case "iopspergb":
|
||||
volumeOptions.IOPSPerGB, err = strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid iopsPerGB value %q, must be integer between 1 and 30: %v", v, err)
|
||||
}
|
||||
case "encrypted":
|
||||
volumeOptions.Encrypted, err = strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid encrypted boolean value %q, must be true or false: %v", v, err)
|
||||
}
|
||||
case "kmskeyid":
|
||||
volumeOptions.KmsKeyId = v
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid option %q for volume plugin %s", k, pluginName)
|
||||
}
|
||||
}
|
||||
|
||||
volumeOptions.AvailabilityZone, err = volumeutil.SelectZoneForVolume(zonePresent, zonesPresent, zone, zones, zonesWithNodes, node, allowedTopologies, pvcName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return volumeOptions, nil
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
|
|
|
@ -56,6 +56,7 @@ go_test(
|
|||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/apis/core/install:go_default_library",
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/util/slice:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
|
@ -64,6 +65,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -79,6 +79,16 @@ const (
|
|||
VolumeDynamicallyCreatedByKey = "kubernetes.io/createdby"
|
||||
)
|
||||
|
||||
// VolumeZoneConfig contains config information about zonal volume.
|
||||
type VolumeZoneConfig struct {
|
||||
ZonePresent bool
|
||||
ZonesPresent bool
|
||||
ReplicaZoneFromNodePresent bool
|
||||
Zone string
|
||||
Zones string
|
||||
ReplicaZoneFromNode string
|
||||
}
|
||||
|
||||
// IsReady checks for the existence of a regular file
|
||||
// called 'ready' in the given directory and returns
|
||||
// true if that file exists.
|
||||
|
@ -308,6 +318,77 @@ func LoadPodFromFile(filePath string) (*v1.Pod, error) {
|
|||
return pod, nil
|
||||
}
|
||||
|
||||
// SelectZone selects a zone for a volume based on several factors:
|
||||
// node.zone, allowedTopologies, zone/zones parameters from storageclass
|
||||
// and zones with active nodes from the cluster
|
||||
func SelectZoneForVolume(zoneParameterPresent, zonesParameterPresent bool, zoneParameter string, zonesParameter, zonesWithNodes sets.String, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, pvcName string) (string, error) {
|
||||
if zoneParameterPresent && zonesParameterPresent {
|
||||
return "", fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time")
|
||||
}
|
||||
|
||||
// pick zone from node if present
|
||||
if node != nil {
|
||||
// DynamicProvisioningScheduling implicit since node is not nil
|
||||
if zoneParameterPresent || zonesParameterPresent {
|
||||
return "", fmt.Errorf("zone[s] cannot be specified in StorageClass if VolumeBindingMode is set to WaitForFirstConsumer. Please specify allowedTopologies in StorageClass for constraining zones.")
|
||||
}
|
||||
|
||||
// pick node's zone and ignore any allowedTopologies (since scheduler is assumed to have accounted for it already)
|
||||
z, ok := node.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("%s Label for node missing", kubeletapis.LabelZoneFailureDomain)
|
||||
}
|
||||
return z, nil
|
||||
}
|
||||
|
||||
// pick zone from allowedZones if specified
|
||||
allowedZones, err := ZonesFromAllowedTopologies(allowedTopologies)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if (len(allowedTopologies) > 0) && (allowedZones.Len() == 0) {
|
||||
return "", fmt.Errorf("no matchLabelExpressions with %s key found in allowedTopologies. Please specify matchLabelExpressions with %s key", kubeletapis.LabelZoneFailureDomain, kubeletapis.LabelZoneFailureDomain)
|
||||
}
|
||||
|
||||
if allowedZones.Len() > 0 {
|
||||
// DynamicProvisioningScheduling implicit since allowedZones present
|
||||
if zoneParameterPresent || zonesParameterPresent {
|
||||
return "", fmt.Errorf("zone[s] cannot be specified in StorageClass if allowedTopologies specified")
|
||||
}
|
||||
return ChooseZoneForVolume(*allowedZones, pvcName), nil
|
||||
}
|
||||
|
||||
// pick zone from parameters if present
|
||||
if zoneParameterPresent {
|
||||
return zoneParameter, nil
|
||||
}
|
||||
|
||||
if zonesParameterPresent {
|
||||
return ChooseZoneForVolume(zonesParameter, pvcName), nil
|
||||
}
|
||||
|
||||
// pick zone from zones with nodes
|
||||
return ChooseZoneForVolume(zonesWithNodes, pvcName), nil
|
||||
}
|
||||
|
||||
// ZonesFromAllowedTopologies returns a list of zones specified in allowedTopologies
|
||||
func ZonesFromAllowedTopologies(allowedTopologies []v1.TopologySelectorTerm) (*sets.String, error) {
|
||||
zones := make(sets.String)
|
||||
for _, term := range allowedTopologies {
|
||||
for _, exp := range term.MatchLabelExpressions {
|
||||
if exp.Key == kubeletapis.LabelZoneFailureDomain {
|
||||
for _, value := range exp.Values {
|
||||
zones.Insert(value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported key found in matchLabelExpressions: %s", exp.Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return &zones, nil
|
||||
}
|
||||
|
||||
func ZonesSetToLabelValue(strSet sets.String) string {
|
||||
return strings.Join(strSet.UnsortedList(), kubeletapis.LabelMultiZoneDelimiter)
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
|
@ -1021,6 +1023,443 @@ func TestValidateZone(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSelectZoneForVolume(t *testing.T) {
|
||||
|
||||
nodeWithZoneLabels := &v1.Node{}
|
||||
nodeWithZoneLabels.Labels = map[string]string{kubeletapis.LabelZoneFailureDomain: "zoneX"}
|
||||
|
||||
nodeWithNoLabels := &v1.Node{}
|
||||
|
||||
tests := []struct {
|
||||
// Parameters passed by test to SelectZoneForVolume
|
||||
Name string
|
||||
ZonePresent bool
|
||||
Zone string
|
||||
ZonesPresent bool
|
||||
Zones string
|
||||
ZonesWithNodes string
|
||||
Node *v1.Node
|
||||
AllowedTopologies []v1.TopologySelectorTerm
|
||||
DynamicProvisioningScheduling bool
|
||||
// Expectations around returned zone from SelectZoneForVolume
|
||||
Reject bool // expect error due to validation failing
|
||||
ExpectSpecificZone bool // expect returned zone to specifically match a single zone (rather than one from a set)
|
||||
ExpectedZone string // single zone that should perfectly match returned zone (requires ExpectSpecificZone to be true)
|
||||
ExpectedZones string // set of zones one of whose members should match returned zone (requires ExpectSpecificZone to be false)
|
||||
}{
|
||||
// NEGATIVE TESTS
|
||||
|
||||
// Zone and Zones are both specified [Fail]
|
||||
// [1] Node irrelevant
|
||||
// [2] Zone and Zones parameters presents
|
||||
// [3] AllowedTopologies irrelevant
|
||||
// [4] DynamicProvisioningScheduling irrelevant
|
||||
{
|
||||
Name: "Nil_Node_with_Zone_Zones_parameters_present",
|
||||
ZonePresent: true,
|
||||
Zone: "zoneX",
|
||||
ZonesPresent: true,
|
||||
Zones: "zoneX,zoneY",
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Node has no zone labels [Fail]
|
||||
// [1] Node with no zone labels
|
||||
// [2] Zone/Zones parameter irrelevant
|
||||
// [3] AllowedTopologies irrelevant
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Node_with_no_Zone_labels",
|
||||
Node: nodeWithNoLabels,
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Node with Zone labels as well as Zone parameter specified [Fail]
|
||||
// [1] Node with zone labels
|
||||
// [2] Zone parameter specified
|
||||
// [3] AllowedTopologies irrelevant
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Node_with_Zone_labels_and_Zone_parameter_present",
|
||||
Node: nodeWithZoneLabels,
|
||||
ZonePresent: true,
|
||||
Zone: "zoneX",
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Node with Zone labels as well as Zones parameter specified [Fail]
|
||||
// [1] Node with zone labels
|
||||
// [2] Zones parameter specified
|
||||
// [3] AllowedTopologies irrelevant
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Node_with_Zone_labels_and_Zones_parameter_present",
|
||||
Node: nodeWithZoneLabels,
|
||||
ZonesPresent: true,
|
||||
Zones: "zoneX,zoneY",
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Zone parameter as well as AllowedTopologies specified [Fail]
|
||||
// [1] nil Node
|
||||
// [2] Zone parameter specified
|
||||
// [3] AllowedTopologies specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Zone_parameter_and_Allowed_Topology_term",
|
||||
Node: nil,
|
||||
ZonePresent: true,
|
||||
Zone: "zoneX",
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneX"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Zones parameter as well as AllowedTopologies specified [Fail]
|
||||
// [1] nil Node
|
||||
// [2] Zones parameter specified
|
||||
// [3] AllowedTopologies specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Zones_parameter_and_Allowed_Topology_term",
|
||||
Node: nil,
|
||||
ZonesPresent: true,
|
||||
Zones: "zoneX,zoneY",
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneX"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// Key specified in AllowedTopologies is not LabelZoneFailureDomain [Fail]
|
||||
// [1] nil Node
|
||||
// [2] no Zone/Zones parameter
|
||||
// [3] AllowedTopologies with invalid key specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Invalid_Allowed_Topology_Key",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: "invalid_key",
|
||||
Values: []string{"zoneX"},
|
||||
},
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneY"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// AllowedTopologies without keys specifying LabelZoneFailureDomain [Fail]
|
||||
// [1] nil Node
|
||||
// [2] no Zone/Zones parameter
|
||||
// [3] Invalid AllowedTopologies
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Invalid_AllowedTopologies",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{},
|
||||
},
|
||||
},
|
||||
Reject: true,
|
||||
},
|
||||
|
||||
// POSITIVE TESTS WITH DynamicProvisioningScheduling DISABLED
|
||||
|
||||
// Select zone from active zones [Pass]
|
||||
// [1] nil Node (Node irrelevant)
|
||||
// [2] no Zone parameter
|
||||
// [3] no AllowedTopologies
|
||||
// [4] DynamicProvisioningScheduling disabled
|
||||
{
|
||||
Name: "No_Zone_Zones_parameter_and_DynamicProvisioningScheduling_disabled",
|
||||
ZonesWithNodes: "zoneX,zoneY",
|
||||
DynamicProvisioningScheduling: false,
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// Select zone from single zone parameter [Pass]
|
||||
// [1] nil Node (Node irrelevant)
|
||||
// [2] Zone parameter specified
|
||||
// [3] no AllowedTopologies
|
||||
// [4] DynamicProvisioningScheduling disabled
|
||||
{
|
||||
Name: "Zone_parameter_present_and_DynamicProvisioningScheduling_disabled",
|
||||
ZonePresent: true,
|
||||
Zone: "zoneX",
|
||||
DynamicProvisioningScheduling: false,
|
||||
Reject: false,
|
||||
ExpectSpecificZone: true,
|
||||
ExpectedZone: "zoneX",
|
||||
},
|
||||
|
||||
// Select zone from zones parameter [Pass]
|
||||
// [1] nil Node (Node irrelevant)
|
||||
// [2] Zones parameter specified
|
||||
// [3] no AllowedTopologies
|
||||
// [4] DynamicProvisioningScheduling disabled
|
||||
{
|
||||
Name: "Zones_parameter_present_and_DynamicProvisioningScheduling_disabled",
|
||||
ZonesPresent: true,
|
||||
Zones: "zoneX,zoneY",
|
||||
DynamicProvisioningScheduling: false,
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// POSITIVE TESTS WITH DynamicProvisioningScheduling ENABLED
|
||||
|
||||
// Select zone from active zones [Pass]
|
||||
// [1] nil Node
|
||||
// [2] no Zone parameter specified
|
||||
// [3] no AllowedTopologies
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_No_Zone_Zones_parameter_and_no_Allowed_topologies_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nil,
|
||||
ZonesWithNodes: "zoneX,zoneY",
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// Select zone from single zone parameter [Pass]
|
||||
// [1] nil Node
|
||||
// [2] Zone parameter specified
|
||||
// [3] no AllowedTopology specified
|
||||
// [4] DynamicSchedulingEnabled enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Zone_parameter_present_and_DynamicProvisioningScheduling_enabled",
|
||||
ZonePresent: true,
|
||||
Zone: "zoneX",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: false,
|
||||
ExpectSpecificZone: true,
|
||||
ExpectedZone: "zoneX",
|
||||
},
|
||||
|
||||
// Select zone from zones parameter [Pass]
|
||||
// [1] nil Node
|
||||
// [2] Zones parameter specified
|
||||
// [3] no AllowedTopology
|
||||
// [4] DynamicSchedulingEnabled enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Zones_parameter_present_and_DynamicProvisioningScheduling_enabled",
|
||||
ZonesPresent: true,
|
||||
Zones: "zoneX,zoneY",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// Select zone from node label [Pass]
|
||||
// [1] Node with zone labels
|
||||
// [2] no zone/zones parameters
|
||||
// [3] no AllowedTopology
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Node_with_Zone_labels_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nodeWithZoneLabels,
|
||||
DynamicProvisioningScheduling: true,
|
||||
Reject: false,
|
||||
ExpectSpecificZone: true,
|
||||
ExpectedZone: "zoneX",
|
||||
},
|
||||
|
||||
// Select zone from node label [Pass]
|
||||
// [1] Node with zone labels
|
||||
// [2] no Zone/Zones parameters
|
||||
// [3] AllowedTopology with single term with multiple values specified (ignored)
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Node_with_Zone_labels_and_Multiple_Allowed_Topology_values_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nodeWithZoneLabels,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneZ", "zoneY"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: false,
|
||||
ExpectSpecificZone: true,
|
||||
ExpectedZone: "zoneX",
|
||||
},
|
||||
|
||||
// Select Zone from AllowedTopologies [Pass]
|
||||
// [1] nil Node
|
||||
// [2] no Zone/Zones parametes specified
|
||||
// [3] AllowedTopologies with single term with multiple values specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_with_Multiple_Allowed_Topology_values_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneX", "zoneY"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// Select zone from AllowedTopologies [Pass]
|
||||
// [1] nil Node
|
||||
// [2] no Zone/Zones parametes specified
|
||||
// [3] AllowedTopologies with multiple terms specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Multiple_Allowed_Topology_terms_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneX"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneY"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: false,
|
||||
ExpectedZones: "zoneX,zoneY",
|
||||
},
|
||||
|
||||
// Select Zone from AllowedTopologies [Pass]
|
||||
// Note: Dual replica with same AllowedTopologies will fail: Nil_Node_and_Single_Allowed_Topology_term_value_and_Dual_replicas
|
||||
// [1] nil Node
|
||||
// [2] no Zone/Zones parametes specified
|
||||
// [3] AllowedTopologies with single term and value specified
|
||||
// [4] DynamicProvisioningScheduling enabled
|
||||
{
|
||||
Name: "Nil_Node_and_Single_Allowed_Topology_term_value_and_DynamicProvisioningScheduling_enabled",
|
||||
Node: nil,
|
||||
DynamicProvisioningScheduling: true,
|
||||
AllowedTopologies: []v1.TopologySelectorTerm{
|
||||
{
|
||||
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
|
||||
{
|
||||
Key: kubeletapis.LabelZoneFailureDomain,
|
||||
Values: []string{"zoneX"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Reject: false,
|
||||
ExpectSpecificZone: true,
|
||||
ExpectedZone: "zoneX",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
utilfeature.DefaultFeatureGate.Set("DynamicProvisioningScheduling=false")
|
||||
if test.DynamicProvisioningScheduling {
|
||||
utilfeature.DefaultFeatureGate.Set("DynamicProvisioningScheduling=true")
|
||||
}
|
||||
|
||||
var zonesParameter, zonesWithNodes sets.String
|
||||
var err error
|
||||
|
||||
if test.Zones != "" {
|
||||
zonesParameter, err = ZonesToSet(test.Zones)
|
||||
if err != nil {
|
||||
t.Errorf("Could not convert Zones to a set: %s. This is a test error %s", test.Zones, test.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if test.ZonesWithNodes != "" {
|
||||
zonesWithNodes, err = ZonesToSet(test.ZonesWithNodes)
|
||||
if err != nil {
|
||||
t.Errorf("Could not convert specified ZonesWithNodes to a set: %s. This is a test error %s", test.ZonesWithNodes, test.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
zone, err := SelectZoneForVolume(test.ZonePresent, test.ZonesPresent, test.Zone, zonesParameter, zonesWithNodes, test.Node, test.AllowedTopologies, test.Name)
|
||||
|
||||
if test.Reject && err == nil {
|
||||
t.Errorf("Unexpected zone from SelectZoneForVolume for %s", zone)
|
||||
continue
|
||||
}
|
||||
|
||||
if !test.Reject {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error from SelectZoneForVolume for %s; Error: %v", test.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if test.ExpectSpecificZone == true {
|
||||
if zone != test.ExpectedZone {
|
||||
t.Errorf("Expected zone %v does not match obtained zone %v for %s", test.ExpectedZone, zone, test.Name)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
expectedZones, err := ZonesToSet(test.ExpectedZones)
|
||||
if err != nil {
|
||||
t.Errorf("Could not convert ExpectedZones to a set: %s. This is a test error", test.ExpectedZones)
|
||||
continue
|
||||
}
|
||||
if !expectedZones.Has(zone) {
|
||||
t.Errorf("Obtained zone %s not member of expectedZones %s", zone, expectedZones)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWindowsPath(t *testing.T) {
|
||||
tests := []struct {
|
||||
path string
|
||||
|
|
Loading…
Reference in New Issue