Merge pull request #66553 from feiskyer/azure-disk-availablity-zone

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add availability zones support to Azure managed disks

**What this PR does / why we need it**:

Continue of [Azure Availability Zone feature](https://github.com/kubernetes/features/issues/586).

This PR adds availability zone support for Azure managed disks and its storage class. Zoned managed disks is enabled by default if there are zoned nodes in the cluster.

The zone could also be customized by `zone` or `zones` parameter, e.g.

```yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  annotations:
  name: managed-disk-zone-1
parameters:
  zone: "southeastasia-1"
  # zones: "southeastasia-1,"southeastasia-2"
  cachingmode: None
  kind: Managed
  storageaccounttype: Standard_LRS
provisioner: kubernetes.io/azure-disk
reclaimPolicy: Delete
volumeBindingMode: Immediate
```

All zoned AzureDisk PV will also be labeled with its availability zone, e.g.

```sh
$ kubectl get pvc pvc-azuredisk-az-1
NAME                 STATUS    VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS          AGE
pvc-azuredisk-az-1   Bound     pvc-5ad0c7b8-8f0b-11e8-94f2-000d3a07de8c   5Gi        RWO            managed-disk-zone-1   2h

$ kubectl get pv pvc-5ad0c7b8-8f0b-11e8-94f2-000d3a07de8c --show-labels
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS    CLAIM                        STORAGECLASS          REASON    AGE       LABELS
pvc-5ad0c7b8-8f0b-11e8-94f2-000d3a07de8c   5Gi        RWO            Delete           Bound     default/pvc-azuredisk-az-1   managed-disk-zone-1             2h        failure-domain.beta.kubernetes.io/region=southeastasia,failure-domain.beta.kubernetes.io/zone=southeastasia-1
```

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

See also the [KEP](https://github.com/kubernetes/community/pull/2364).

DynamicProvisioningScheduling feature would be added in a following PR.

**Release note**:

```release-note
Azure managed disks now support availability zones and new parameters `zoned`, `zone` and `zones` are added for AzureDisk storage class.
```

/kind feature
/sig azure
/assign @brendandburns @khenidak @andyzhangx
pull/8/head
Kubernetes Submit Queue 2018-08-06 02:02:54 -07:00 committed by GitHub
commit 14bcbb9450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 568 additions and 21 deletions

View File

@ -40,6 +40,7 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/azure/auth:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
@ -50,6 +51,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute:go_default_library",
@ -82,6 +84,7 @@ go_test(
"azure_vmss_cache_test.go",
"azure_vmss_test.go",
"azure_wrap_test.go",
"azure_zones_test.go",
],
embed = [":go_default_library"],
deps = [
@ -92,6 +95,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage:go_default_library",

View File

@ -21,13 +21,19 @@ import (
"io"
"io/ioutil"
"strings"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth"
"k8s.io/kubernetes/pkg/controller"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/version"
"github.com/Azure/go-autorest/autorest"
@ -59,6 +65,9 @@ var (
defaultExcludeMasterFromStandardLB = true
)
// Azure implements PVLabeler.
var _ cloudprovider.PVLabeler = (*Cloud)(nil)
// Config holds the configuration parsed from the --cloud-config flag
// All fields are required unless otherwise specified
type Config struct {
@ -147,6 +156,13 @@ type Cloud struct {
metadata *InstanceMetadata
vmSet VMSet
// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
nodeInformerSynced cache.InformerSynced
// Clients for vmss.
VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient
VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient
@ -240,6 +256,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
az := Cloud{
Config: *config,
Environment: *env,
nodeZones: map[string]sets.String{},
DisksClient: newAzDisksClient(azClientConfig),
RoutesClient: newAzRoutesClient(azClientConfig),
@ -424,3 +441,87 @@ func initDiskControllers(az *Cloud) error {
return nil
}
// SetInformers sets informers for Azure cloud provider.
func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
glog.Infof("Setting up informers for Azure cloud provider")
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
az.updateNodeZones(nil, node)
},
UpdateFunc: func(prev, obj interface{}) {
prevNode := prev.(*v1.Node)
newNode := obj.(*v1.Node)
if newNode.Labels[kubeletapis.LabelZoneFailureDomain] ==
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
return
}
az.updateNodeZones(prevNode, newNode)
},
DeleteFunc: func(obj interface{}) {
node, isNode := obj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here
// and we need to handle that correctly.
if !isNode {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", obj)
return
}
node, ok = deletedState.Obj.(*v1.Node)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
az.updateNodeZones(node, nil)
},
})
az.nodeInformerSynced = nodeInformer.HasSynced
}
func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
az.nodeZonesLock.Lock()
defer az.nodeZonesLock.Unlock()
if prevNode != nil {
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok && az.isAvailabilityZone(prevZone) {
az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
if az.nodeZones[prevZone].Len() == 0 {
az.nodeZones[prevZone] = nil
}
}
}
if newNode != nil {
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok && az.isAvailabilityZone(newZone) {
if az.nodeZones[newZone] == nil {
az.nodeZones[newZone] = sets.NewString()
}
az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
}
}
}
// GetActiveZones returns all the zones in which k8s nodes are currently running.
func (az *Cloud) GetActiveZones() (sets.String, error) {
if az.nodeInformerSynced == nil {
return nil, fmt.Errorf("Azure cloud provider doesn't have informers set")
}
az.nodeZonesLock.Lock()
defer az.nodeZonesLock.Unlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones")
}
zones := sets.NewString()
for zone, nodes := range az.nodeZones {
if len(nodes) > 0 {
zones.Insert(zone)
}
}
return zones, nil
}

View File

@ -32,6 +32,7 @@ import (
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
"github.com/rubiojr/go-vhd/vhd"
kwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
)

View File

@ -17,16 +17,22 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"path"
"strconv"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
kwait "k8s.io/apimachinery/pkg/util/wait"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -35,22 +41,85 @@ type ManagedDiskController struct {
common *controllerCommon
}
// ManagedDiskOptions specifies the options of managed disks.
type ManagedDiskOptions struct {
// The name of the disk.
DiskName string
// The size in GB.
SizeGB int
// The name of PVC.
PVCName string
// The name of resource group.
ResourceGroup string
// Wether the disk is zoned.
Zoned bool
// Wether AvailabilityZone is set.
ZonePresent bool
// Wether AvailabilityZones is set.
ZonesPresent bool
// The AvailabilityZone to create the disk.
AvailabilityZone string
// List of AvailabilityZone to create the disk.
AvailabilityZones string
// The tags of the disk.
Tags map[string]string
// The SKU of storage account.
StorageAccountType storage.SkuName
}
func newManagedDiskController(common *controllerCommon) (*ManagedDiskController, error) {
return &ManagedDiskController{common: common}, nil
}
//CreateManagedDisk : create managed disk
func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccountType storage.SkuName, resourceGroup string,
sizeGB int, tags map[string]string) (string, error) {
glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", diskName, storageAccountType, sizeGB)
func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) {
var zones sets.String
var activeZones sets.String
var err error
glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB)
// Get active zones which have nodes running on.
activeZones, err = c.common.cloud.GetActiveZones()
if err != nil {
return "", fmt.Errorf("error querying active zones: %v", err)
}
// Validate and choose availability zone for creating disk.
if options.Zoned && !options.ZonePresent && !options.ZonesPresent {
// Neither "zone" or "zones" specified. Pick a zone randomly selected
// from all active zones where Kubernetes cluster has a node.
zones = activeZones
} else if !options.ZonePresent && options.ZonesPresent {
// Choose zone from specified zones.
if zones, err = util.ZonesToSet(options.AvailabilityZones); err != nil {
return "", err
}
} else if options.ZonePresent && !options.ZonesPresent {
if err := util.ValidateZone(options.AvailabilityZone); err != nil {
return "", err
}
zones = make(sets.String)
zones.Insert(options.AvailabilityZone)
}
var createZones *[]string
if len(zones.List()) > 0 {
createAZ := util.ChooseZoneForVolume(zones, options.PVCName)
// Do not allow creation of disks in zones that are do not have nodes. Such disks
// are not currently usable.
if !activeZones.Has(createAZ) {
return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ)
}
zoneList := []string{c.common.cloud.GetZoneID(createAZ)}
createZones = &zoneList
}
// insert original tags to newTags
newTags := make(map[string]*string)
azureDDTag := "kubernetes-azure-dd"
newTags["created-by"] = &azureDDTag
// insert original tags to newTags
if tags != nil {
for k, v := range tags {
if options.Tags != nil {
for k, v := range options.Tags {
// Azure won't allow / (forward slash) in tags
newKey := strings.Replace(k, "/", "-", -1)
newValue := strings.Replace(v, "/", "-", -1)
@ -58,25 +127,27 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun
}
}
diskSizeGB := int32(sizeGB)
diskSizeGB := int32(options.SizeGB)
model := compute.Disk{
Location: &c.common.location,
Tags: newTags,
Zones: createZones,
Sku: &compute.DiskSku{
Name: compute.StorageAccountTypes(storageAccountType),
Name: compute.StorageAccountTypes(options.StorageAccountType),
},
DiskProperties: &compute.DiskProperties{
DiskSizeGB: &diskSizeGB,
CreationData: &compute.CreationData{CreateOption: compute.Empty},
}}
},
}
if resourceGroup == "" {
resourceGroup = c.common.resourceGroup
if options.ResourceGroup == "" {
options.ResourceGroup = c.common.resourceGroup
}
ctx, cancel := getContextWithCancel()
defer cancel()
_, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, resourceGroup, diskName, model)
_, err = c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model)
if err != nil {
return "", err
}
@ -84,7 +155,7 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun
diskID := ""
err = kwait.ExponentialBackoff(defaultBackOff, func() (bool, error) {
provisionState, id, err := c.getDisk(resourceGroup, diskName)
provisionState, id, err := c.getDisk(options.ResourceGroup, options.DiskName)
diskID = id
// We are waiting for provisioningState==Succeeded
// We don't want to hand-off managed disks to k8s while they are
@ -99,9 +170,9 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun
})
if err != nil {
glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v but was unable to confirm provisioningState in poll process", diskName, storageAccountType, sizeGB)
glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v but was unable to confirm provisioningState in poll process", options.DiskName, options.StorageAccountType, options.SizeGB)
} else {
glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v", diskName, storageAccountType, sizeGB)
glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB)
}
return diskID, nil
@ -201,3 +272,58 @@ func getResourceGroupFromDiskURI(diskURI string) (string, error) {
}
return fields[4], nil
}
// GetLabelsForVolume implements PVLabeler.GetLabelsForVolume
func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore if not AzureDisk.
if pv.Spec.AzureDisk == nil {
return nil, nil
}
// Ignore any volumes that are being provisioned
if pv.Spec.AzureDisk.DiskName == volume.ProvisionedVolumeName {
return nil, nil
}
return c.GetAzureDiskLabels(pv.Spec.AzureDisk.DataDiskURI)
}
// GetAzureDiskLabels gets availability zone labels for Azuredisk.
func (c *Cloud) GetAzureDiskLabels(diskURI string) (map[string]string, error) {
// Get disk's resource group.
diskName := path.Base(diskURI)
resourceGroup, err := getResourceGroupFromDiskURI(diskURI)
if err != nil {
glog.Errorf("Failed to get resource group for AzureDisk %q: %v", diskName, err)
return nil, err
}
// Get information of the disk.
ctx, cancel := getContextWithCancel()
defer cancel()
disk, err := c.DisksClient.Get(ctx, resourceGroup, diskName)
if err != nil {
glog.Errorf("Failed to get information for AzureDisk %q: %v", diskName, err)
return nil, err
}
// Check whether availability zone is specified.
if disk.Zones == nil || len(*disk.Zones) == 0 {
glog.V(4).Infof("Azure disk %q is not zoned", diskName)
return nil, nil
}
zones := *disk.Zones
zoneID, err := strconv.Atoi(zones[0])
if err != nil {
return nil, fmt.Errorf("failed to parse zone %v for AzureDisk %v: %v", zones, diskName, err)
}
zone := c.makeZone(zoneID)
glog.V(4).Infof("Got zone %q for Azure disk %q", zone, diskName)
labels := map[string]string{
kubeletapis.LabelZoneRegion: c.Location,
kubeletapis.LabelZoneFailureDomain: zone,
}
return labels, nil
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
serviceapi "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -953,6 +954,8 @@ func getTestCloud() (az *Cloud) {
MaximumLoadBalancerRuleCount: 250,
VMType: vmTypeStandard,
},
nodeZones: map[string]sets.String{},
nodeInformerSynced: func() bool { return true },
}
az.DisksClient = newFakeDisksClient()
az.InterfacesClient = newFakeAzureInterfacesClient()

View File

@ -41,6 +41,20 @@ func (az *Cloud) makeZone(zoneID int) string {
return fmt.Sprintf("%s-%d", strings.ToLower(az.Location), zoneID)
}
// isAvailabilityZone returns true if the zone is in format of <region>-<zone-id>.
func (az *Cloud) isAvailabilityZone(zone string) bool {
return strings.HasPrefix(zone, fmt.Sprintf("%s-", az.Location))
}
// GetZoneID returns the ID of zone from node's zone label.
func (az *Cloud) GetZoneID(zoneLabel string) string {
if !az.isAvailabilityZone(zoneLabel) {
return ""
}
return strings.TrimPrefix(zoneLabel, fmt.Sprintf("%s-", az.Location))
}
// GetZone returns the Zone containing the current availability zone and locality region that the program is running in.
// If the node is not running with availability zones, then it will fall back to fault domain.
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {

View File

@ -0,0 +1,73 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"testing"
)
func TestIsAvailabilityZone(t *testing.T) {
location := "eastus"
az := &Cloud{
Config: Config{
Location: location,
},
}
tests := []struct {
desc string
zone string
expected bool
}{
{"empty string should return false", "", false},
{"wrong farmat should return false", "123", false},
{"wrong location should return false", "chinanorth-1", false},
{"correct zone should return true", "eastus-1", true},
}
for _, test := range tests {
actual := az.isAvailabilityZone(test.zone)
if actual != test.expected {
t.Errorf("test [%q] get unexpected result: %v != %v", test.desc, actual, test.expected)
}
}
}
func TestGetZoneID(t *testing.T) {
location := "eastus"
az := &Cloud{
Config: Config{
Location: location,
},
}
tests := []struct {
desc string
zone string
expected string
}{
{"empty string should return empty string", "", ""},
{"wrong farmat should return empty string", "123", ""},
{"wrong location should return empty string", "chinanorth-1", ""},
{"correct zone should return zone ID", "eastus-1", "1"},
}
for _, test := range tests {
actual := az.GetZoneID(test.zone)
if actual != test.expected {
t.Errorf("test [%q] get unexpected result: %q != %q", test.desc, actual, test.expected)
}
}
}

View File

@ -63,6 +63,7 @@ go_test(
"azure_common_test.go",
"azure_dd_block_test.go",
"azure_dd_test.go",
"azure_provision_test.go",
],
embed = [":go_default_library"],
deps = [
@ -73,5 +74,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -35,7 +35,7 @@ type DiskController interface {
CreateBlobDisk(dataDiskName string, storageAccountType storage.SkuName, sizeGB int) (string, error)
DeleteBlobDisk(diskUri string) error
CreateManagedDisk(diskName string, storageAccountType storage.SkuName, resourceGroup string, sizeGB int, tags map[string]string) (string, error)
CreateManagedDisk(options *azure.ManagedDiskOptions) (string, error)
DeleteManagedDisk(diskURI string) error
// Attaches the disk to the host machine.
@ -58,6 +58,9 @@ type DiskController interface {
// Expand the disk to new size
ResizeDisk(diskURI string, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error)
// GetAzureDiskLabels gets availability zone labels for Azuredisk.
GetAzureDiskLabels(diskURI string) (map[string]string, error)
}
type azureDataDiskPlugin struct {

View File

@ -19,12 +19,14 @@ package azure_dd
import (
"errors"
"fmt"
"strconv"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -68,6 +70,25 @@ func (d *azureDiskDeleter) Delete() error {
return diskController.DeleteBlobDisk(volumeSource.DataDiskURI)
}
// parseZoned parsed 'zoned' for storage class. If zoned is not specified (empty string),
// then it defaults to true for managed disks.
func parseZoned(zonedString string, kind v1.AzureDataDiskKind) (bool, error) {
if zonedString == "" {
return kind == v1.AzureManagedDisk, nil
}
zoned, err := strconv.ParseBool(zonedString)
if err != nil {
return false, fmt.Errorf("failed to parse 'zoned': %v", err)
}
if zoned && kind != v1.AzureManagedDisk {
return false, fmt.Errorf("zoned is only supported by managed disks")
}
return zoned, nil
}
func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
if !util.AccessModesContainedInAll(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes())
@ -85,7 +106,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
if len(p.options.PVC.Spec.AccessModes) == 1 {
if p.options.PVC.Spec.AccessModes[0] != supportedModes[0] {
return nil, fmt.Errorf("AzureDisk - mode %s is not supporetd by AzureDisk plugin supported mode is %s", p.options.PVC.Spec.AccessModes[0], supportedModes)
return nil, fmt.Errorf("AzureDisk - mode %s is not supported by AzureDisk plugin (supported mode is %s)", p.options.PVC.Spec.AccessModes[0], supportedModes)
}
}
@ -96,6 +117,13 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
strKind string
err error
resourceGroup string
zoned bool
zonePresent bool
zonesPresent bool
strZoned string
availabilityZone string
availabilityZones string
)
// maxLength = 79 - (4 for ".vhd") = 75
name := util.GenerateVolumeName(p.options.ClusterName, p.options.PVName, 75)
@ -123,6 +151,14 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
fsType = strings.ToLower(v)
case "resourcegroup":
resourceGroup = v
case "zone":
zonePresent = true
availabilityZone = v
case "zones":
zonesPresent = true
availabilityZones = v
case "zoned":
strZoned = v
default:
return nil, fmt.Errorf("AzureDisk - invalid option %s in storage class", k)
}
@ -139,6 +175,19 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
return nil, err
}
zoned, err = parseZoned(strZoned, kind)
if err != nil {
return nil, err
}
if !zoned && (zonePresent || zonesPresent) {
return nil, fmt.Errorf("zone or zones StorageClass parameters must be used together with zoned parameter")
}
if zonePresent && zonesPresent {
return nil, fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time")
}
if cachingMode, err = normalizeCachingMode(cachingMode); err != nil {
return nil, err
}
@ -154,16 +203,39 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
// create disk
diskURI := ""
labels := map[string]string{}
if kind == v1.AzureManagedDisk {
tags := make(map[string]string)
if p.options.CloudTags != nil {
tags = *(p.options.CloudTags)
}
diskURI, err = diskController.CreateManagedDisk(name, skuName, resourceGroup, requestGiB, tags)
volumeOptions := &azure.ManagedDiskOptions{
DiskName: name,
StorageAccountType: skuName,
ResourceGroup: resourceGroup,
PVCName: p.options.PVC.Name,
SizeGB: requestGiB,
Tags: tags,
Zoned: zoned,
ZonePresent: zonePresent,
ZonesPresent: zonesPresent,
AvailabilityZone: availabilityZone,
AvailabilityZones: availabilityZones,
}
diskURI, err = diskController.CreateManagedDisk(volumeOptions)
if err != nil {
return nil, err
}
labels, err = diskController.GetAzureDiskLabels(diskURI)
if err != nil {
return nil, err
}
} else {
if zoned {
return nil, errors.New("zoned parameter is only supported for managed disks")
}
if kind == v1.AzureDedicatedBlobDisk {
_, diskURI, _, err = diskController.CreateVolume(name, account, storageAccountType, location, requestGiB)
if err != nil {
@ -189,7 +261,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: p.options.PVName,
Labels: map[string]string{},
Labels: labels,
Annotations: map[string]string{
"volumehelper.VolumeDynamicallyCreatedByKey": "azure-disk-dynamic-provisioner",
},

View File

@ -0,0 +1,96 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure_dd
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
)
func TestParseZoned(t *testing.T) {
tests := []struct {
msg string
zoneString string
diskKind v1.AzureDataDiskKind
expected bool
expectError bool
}{
{
msg: "managed disk should default to zoned",
diskKind: v1.AzureManagedDisk,
expected: true,
},
{
msg: "shared blob disk should default to un-zoned",
diskKind: v1.AzureSharedBlobDisk,
expected: false,
},
{
msg: "shared dedicated disk should default to un-zoned",
diskKind: v1.AzureDedicatedBlobDisk,
expected: false,
},
{
msg: "managed disk should support zoned=true",
diskKind: v1.AzureManagedDisk,
zoneString: "true",
expected: true,
},
{
msg: "managed disk should support zoned=false",
diskKind: v1.AzureManagedDisk,
zoneString: "false",
expected: false,
},
{
msg: "shared blob disk should support zoned=false",
diskKind: v1.AzureSharedBlobDisk,
zoneString: "false",
expected: false,
},
{
msg: "shared blob disk shouldn't support zoned=true",
diskKind: v1.AzureSharedBlobDisk,
zoneString: "true",
expectError: true,
},
{
msg: "shared dedicated disk should support zoned=false",
diskKind: v1.AzureDedicatedBlobDisk,
zoneString: "false",
expected: false,
},
{
msg: "dedicated blob disk shouldn't support zoned=true",
diskKind: v1.AzureDedicatedBlobDisk,
zoneString: "true",
expectError: true,
},
}
for i, test := range tests {
real, err := parseZoned(test.zoneString, test.diskKind)
if test.expectError {
assert.Error(t, err, fmt.Sprintf("TestCase[%d]: %s", i, test.msg))
} else {
assert.Equal(t, test.expected, real, fmt.Sprintf("TestCase[%d]: %s", i, test.msg))
}
}
}

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/azure:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",

View File

@ -28,6 +28,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/features"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
@ -58,6 +59,7 @@ type persistentVolumeLabel struct {
ebsVolumes aws.Volumes
cloudConfig []byte
gceCloudProvider *gce.GCECloud
azureProvider *azure.Cloud
}
var _ admission.MutationInterface = &persistentVolumeLabel{}
@ -69,7 +71,7 @@ var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
// As a side effect, the cloud provider may block invalid or non-existent volumes.
func newPersistentVolumeLabel() *persistentVolumeLabel {
// DEPRECATED: cloud-controller-manager will now start NewPersistentVolumeLabelController
// which does exactly what this admission controller used to do. So once GCE and AWS can
// which does exactly what this admission controller used to do. So once GCE, AWS and AZURE can
// run externally, we can remove this admission controller.
glog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
"Please remove this controller from your configuration files and scripts.")
@ -123,6 +125,13 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
}
volumeLabels = labels
}
if volume.Spec.AzureDisk != nil {
labels, err := l.findAzureDiskLabels(volume)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("error querying AzureDisk volume %s: %v", volume.Spec.AzureDisk.DiskName, err))
}
volumeLabels = labels
}
requirements := make([]api.NodeSelectorRequirement, 0)
if len(volumeLabels) != 0 {
@ -272,3 +281,45 @@ func (l *persistentVolumeLabel) getGCECloudProvider() (*gce.GCECloud, error) {
}
return l.gceCloudProvider, nil
}
// getAzureCloudProvider returns the Azure cloud provider, for use for querying volume labels
func (l *persistentVolumeLabel) getAzureCloudProvider() (*azure.Cloud, error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.azureProvider == nil {
var cloudConfigReader io.Reader
if len(l.cloudConfig) > 0 {
cloudConfigReader = bytes.NewReader(l.cloudConfig)
}
cloudProvider, err := cloudprovider.GetCloudProvider("azure", cloudConfigReader)
if err != nil || cloudProvider == nil {
return nil, err
}
azureProvider, ok := cloudProvider.(*azure.Cloud)
if !ok {
// GetCloudProvider has gone very wrong
return nil, fmt.Errorf("error retrieving Azure cloud provider")
}
l.azureProvider = azureProvider
}
return l.azureProvider, nil
}
func (l *persistentVolumeLabel) findAzureDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.AzureDisk.DiskName == vol.ProvisionedVolumeName {
return nil, nil
}
provider, err := l.getAzureCloudProvider()
if err != nil {
return nil, err
}
if provider == nil {
return nil, fmt.Errorf("unable to build Azure cloud provider for AzureDisk")
}
return provider.GetAzureDiskLabels(volume.Spec.AzureDisk.DataDiskURI)
}