From ba9a9cf7c3fc9c4c0e7c39dffc01e3f00b9c9b6a Mon Sep 17 00:00:00 2001 From: Subramanian Neelakantan Date: Thu, 3 Jan 2019 15:25:43 +0530 Subject: [PATCH] Applies zone labels to newly created vsphere volumes --- pkg/cloudprovider/providers/vsphere/BUILD | 5 + .../providers/vsphere/vclib/constants.go | 21 +- .../providers/vsphere/vclib/datacenter.go | 15 + .../providers/vsphere/vclib/datastore.go | 16 + .../providers/vsphere/vsphere.go | 294 +++++++++++++++++- .../providers/vsphere/vsphere_test.go | 190 +++++++++++ pkg/volume/vsphere_volume/vsphere_volume.go | 29 ++ .../vsphere_volume/vsphere_volume_util.go | 6 + .../persistentvolume/label/admission.go | 54 +++- .../persistentvolume/label/admission_test.go | 67 ++++ 10 files changed, 677 insertions(+), 20 deletions(-) diff --git a/pkg/cloudprovider/providers/vsphere/BUILD b/pkg/cloudprovider/providers/vsphere/BUILD index ebfecdb86c..02aaeac1e0 100644 --- a/pkg/cloudprovider/providers/vsphere/BUILD +++ b/pkg/cloudprovider/providers/vsphere/BUILD @@ -21,12 +21,15 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library", + "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", + "//vendor/github.com/vmware/govmomi/object:go_default_library", "//vendor/github.com/vmware/govmomi/vapi/rest:go_default_library", "//vendor/github.com/vmware/govmomi/vapi/tags:go_default_library", "//vendor/github.com/vmware/govmomi/vim25:go_default_library", @@ -58,6 +61,7 @@ go_test( "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//vendor/github.com/vmware/govmomi:go_default_library", + "//vendor/github.com/vmware/govmomi/find:go_default_library", "//vendor/github.com/vmware/govmomi/lookup/simulator:go_default_library", "//vendor/github.com/vmware/govmomi/property:go_default_library", "//vendor/github.com/vmware/govmomi/simulator:go_default_library", @@ -67,6 +71,7 @@ go_test( "//vendor/github.com/vmware/govmomi/vapi/simulator:go_default_library", "//vendor/github.com/vmware/govmomi/vapi/tags:go_default_library", "//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library", + "//vendor/github.com/vmware/govmomi/vim25/types:go_default_library", ], ) diff --git a/pkg/cloudprovider/providers/vsphere/vclib/constants.go b/pkg/cloudprovider/providers/vsphere/vclib/constants.go index 74386bae24..a8a4e5f420 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/constants.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/constants.go @@ -40,15 +40,18 @@ const ( // Other Constants const ( - LogLevel = 4 - DatastoreProperty = "datastore" - ResourcePoolProperty = "resourcePool" - DatastoreInfoProperty = "info" - VirtualMachineType = "VirtualMachine" - RoundTripperDefaultCount = 3 - VSANDatastoreType = "vsan" - DummyVMPrefixName = "vsphere-k8s" - ActivePowerState = "poweredOn" + LogLevel = 4 + DatastoreProperty = "datastore" + ResourcePoolProperty = "resourcePool" + DatastoreInfoProperty = "info" + VirtualMachineType = "VirtualMachine" + RoundTripperDefaultCount = 3 + VSANDatastoreType = "vsan" + DummyVMPrefixName = "vsphere-k8s" + ActivePowerState = "poweredOn" + DatacenterType = "Datacenter" + ClusterComputeResourceType = "ClusterComputeResource" + HostSystemType = "HostSystem" ) // Test Constants diff --git a/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go b/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go index 31c9b98dd7..bbd60cc620 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go @@ -148,6 +148,21 @@ func (dc *Datacenter) GetAllDatastores(ctx context.Context) (map[string]*Datasto return dsURLInfoMap, nil } +// GetAllHosts returns all the host objects in this datacenter of VC +func (dc *Datacenter) GetAllHosts(ctx context.Context) ([]types.ManagedObjectReference, error) { + finder := getFinder(dc) + hostSystems, err := finder.HostSystemList(ctx, "*") + if err != nil { + klog.Errorf("Failed to get all hostSystems. err: %+v", err) + return nil, err + } + var hostMors []types.ManagedObjectReference + for _, hs := range hostSystems { + hostMors = append(hostMors, hs.Reference()) + } + return hostMors, nil +} + // GetDatastoreByPath gets the Datastore object from the given vmDiskPath func (dc *Datacenter) GetDatastoreByPath(ctx context.Context, vmDiskPath string) (*Datastore, error) { datastorePathObj := new(object.DatastorePath) diff --git a/pkg/cloudprovider/providers/vsphere/vclib/datastore.go b/pkg/cloudprovider/providers/vsphere/vclib/datastore.go index a57685bc76..859e56239f 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/datastore.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/datastore.go @@ -85,3 +85,19 @@ func (ds *Datastore) IsCompatibleWithStoragePolicy(ctx context.Context, storageP } return pbmClient.IsDatastoreCompatible(ctx, storagePolicyID, ds) } + +// GetDatastoreHostMounts gets the host names mounted on given datastore +func (ds *Datastore) GetDatastoreHostMounts(ctx context.Context) ([]types.ManagedObjectReference, error) { + var dsMo mo.Datastore + pc := property.DefaultCollector(ds.Client()) + err := pc.RetrieveOne(ctx, ds.Datastore.Reference(), []string{"host"}, &dsMo) + if err != nil { + klog.Errorf("Failed to retrieve datastore host mount property. err: %v", err) + return nil, err + } + hosts := make([]types.ManagedObjectReference, len(dsMo.Host)) + for _, dsHostMount := range dsMo.Host { + hosts = append(hosts, dsHostMount.Key) + } + return hosts, nil +} diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index d0ae21247b..de4acac515 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -33,15 +33,19 @@ import ( "gopkg.in/gcfg.v1" + "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vapi/rest" "github.com/vmware/govmomi/vapi/tags" "github.com/vmware/govmomi/vim25/mo" + vmwaretypes "github.com/vmware/govmomi/vim25/types" "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" cloudprovider "k8s.io/cloud-provider" nodehelpers "k8s.io/cloud-provider/node/helpers" + volumehelpers "k8s.io/cloud-provider/volume/helpers" "k8s.io/klog" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers" @@ -66,17 +70,20 @@ var cleanUpDummyVMLock sync.RWMutex const ( MissingUsernameErrMsg = "Username is missing" MissingPasswordErrMsg = "Password is missing" + NoZoneTagInVCErrMsg = "No zone tags found in vCenter" ) // Error constants var ( ErrUsernameMissing = errors.New(MissingUsernameErrMsg) ErrPasswordMissing = errors.New(MissingPasswordErrMsg) + ErrNoZoneTagInVC = errors.New(NoZoneTagInVCErrMsg) ) var _ cloudprovider.Interface = (*VSphere)(nil) var _ cloudprovider.Instances = (*VSphere)(nil) var _ cloudprovider.Zones = (*VSphere)(nil) +var _ cloudprovider.PVLabeler = (*VSphere)(nil) // VSphere is an implementation of cloud provider Interface for VSphere. type VSphere struct { @@ -501,6 +508,7 @@ func buildVSphereFromConfig(cfg VSphereConfig) (*VSphere, error) { if cfg.Global.VCenterPort == "" { cfg.Global.VCenterPort = "443" } + vsphereInstanceMap, err := populateVsphereInstanceMap(&cfg) if err != nil { return nil, err @@ -833,13 +841,17 @@ func (vs *VSphere) LoadBalancer() (cloudprovider.LoadBalancer, bool) { return nil, false } +func (vs *VSphere) isZoneEnabled() bool { + return vs.cfg != nil && vs.cfg.Labels.Zone != "" && vs.cfg.Labels.Region != "" +} + // Zones returns an implementation of Zones for vSphere. func (vs *VSphere) Zones() (cloudprovider.Zones, bool) { - if vs.cfg == nil { - klog.V(1).Info("The vSphere cloud provider does not support zones") - return nil, false + if vs.isZoneEnabled() { + return vs, true } - return vs, true + klog.V(1).Info("The vSphere cloud provider does not support zones") + return nil, false } // Routes returns a false since the interface is not supported for vSphere. @@ -1412,14 +1424,10 @@ func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) { } if zone.Region == "" { - if vs.cfg.Labels.Region != "" { - return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID) - } + return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID) } if zone.FailureDomain == "" { - if vs.cfg.Labels.Zone != "" { - return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID) - } + return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID) } return nil @@ -1438,3 +1446,269 @@ func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.Node func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) { return cloudprovider.Zone{}, cloudprovider.NotImplemented } + +// GetLabelsForVolume implements the PVLabeler interface for VSphere +// since this interface is used by the PV label admission controller. +func (vs *VSphere) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) { + // ignore if zones not enabled + if !vs.isZoneEnabled() { + klog.V(4).Infof("Zone labels for volume is not enabled in vsphere.conf") + return nil, nil + } + // ignore if not vSphere volume + if pv.Spec.VsphereVolume == nil { + return nil, nil + } + return vs.GetVolumeLabels(pv.Spec.VsphereVolume.VolumePath) +} + +// GetVolumeLabels returns the well known zone and region labels for given volume +func (vs *VSphere) GetVolumeLabels(volumePath string) (map[string]string, error) { + // Create context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // if zones is not enabled return no labels + if !vs.isZoneEnabled() { + klog.V(4).Infof("Volume zone labels is not enabled in vsphere.conf") + return nil, nil + } + + datastorePathObj, err := vclib.GetDatastorePathObjFromVMDiskPath(volumePath) + if err != nil { + klog.Errorf("Failed to get datastore for volume: %v: %+v", volumePath, err) + return nil, err + } + dsZones, err := vs.GetZonesForDatastore(ctx, datastorePathObj.Datastore) + if err != nil { + klog.Errorf("Failed to get zones for datastore %v: %+v", datastorePathObj.Datastore, err) + return nil, err + } + dsZones, err = vs.collapseZonesInRegion(ctx, dsZones) + // FIXME: For now, pick the first zone of datastore as the zone of volume + labels := make(map[string]string) + if len(dsZones) > 0 { + labels[v1.LabelZoneRegion] = dsZones[0].Region + labels[v1.LabelZoneFailureDomain] = dsZones[0].FailureDomain + } + return labels, nil +} + +// collapse all zones in same region. Join FailureDomain with well known separator +func (vs *VSphere) collapseZonesInRegion(ctx context.Context, zones []cloudprovider.Zone) ([]cloudprovider.Zone, error) { + // first create a map of region -> list of zones in that region + regionToZones := make(map[string][]string) + for _, zone := range zones { + fds, exists := regionToZones[zone.Region] + if !exists { + fds = make([]string, 0) + } + regionToZones[zone.Region] = append(fds, zone.FailureDomain) + } + + // Join all fds in same region and return Zone instances + collapsedZones := make([]cloudprovider.Zone, 0) + for region, fds := range regionToZones { + fdSet := sets.NewString(fds...) + appendedZone := volumehelpers.ZonesSetToLabelValue(fdSet) + collapsedZones = append(collapsedZones, cloudprovider.Zone{FailureDomain: appendedZone, Region: region}) + } + return collapsedZones, nil +} + +// GetZonesForDatastore returns all the zones from which this datastore is visible +func (vs *VSphere) GetZonesForDatastore(ctx context.Context, datastore string) ([]cloudprovider.Zone, error) { + vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx) + if err != nil { + klog.Errorf("Failed to get vSphere instance: %+v", err) + return nil, err + } + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) + if err != nil { + klog.Errorf("Failed to get datacenter: %+v", err) + return nil, err + } + // get the hosts mounted on this datastore + // datastore -> ["host-1", "host-2", "host-3", ...] + ds, err := dc.GetDatastoreByName(ctx, datastore) + if err != nil { + klog.Errorf("Failed to get datastore by name: %v: %+v", datastore, err) + return nil, err + } + dsHosts, err := ds.GetDatastoreHostMounts(ctx) + if err != nil { + klog.Errorf("Failed to get datastore host mounts for %v: %+v", datastore, err) + return nil, err + } + klog.V(4).Infof("Got host mounts for datastore: %v: %v", datastore, dsHosts) + + // compute map of zone to list of hosts in that zone across all hosts in vsphere + // zone -> ["host-i", "host-j", "host-k", ...] + zoneToHosts, err := vs.GetZoneToHosts(ctx, vsi) + if err != nil { + klog.Errorf("Failed to get zones for hosts: %+v", err) + return nil, err + } + klog.V(4).Infof("Got zone to hosts: %v", zoneToHosts) + + // datastore belongs to a zone if all hosts in that zone mount that datastore + dsZones := make([]cloudprovider.Zone, 0) + for zone, zoneHosts := range zoneToHosts { + // if zone is valid and zoneHosts is a subset of dsHosts, then add zone + if zone.Region != "" && containsAll(dsHosts, zoneHosts) { + dsZones = append(dsZones, zone) + } + } + klog.V(4).Infof("Datastore %s belongs to zones: %v", datastore, dsZones) + return dsZones, nil +} + +// GetZoneToHosts returns a map of 'zone' -> 'list of hosts in that zone' in given VC +func (vs *VSphere) GetZoneToHosts(ctx context.Context, vsi *VSphereInstance) (map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference, error) { + // Approach is to find tags with the category of 'vs.cfg.Labels.Zone' + zoneToHosts := make(map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference) + + getHostsInTagCategory := func(ctx context.Context, tagCategoryName string) (map[vmwaretypes.ManagedObjectReference]string, error) { + + hostToTag := make(map[vmwaretypes.ManagedObjectReference]string) + err := withTagsClient(ctx, vsi.conn, func(c *rest.Client) error { + // Look whether the zone/region tag is defined in VC + tagManager := tags.NewManager(c) + tagsForCat, err := tagManager.GetTagsForCategory(ctx, tagCategoryName) + if err != nil { + klog.V(4).Infof("No tags with category %s exists in VC. So ignoring.", tagCategoryName) + // return a special error so that tag unavailability can be ignored + return ErrNoZoneTagInVC + } + klog.V(4).Infof("List of tags under category %s: %v", tagCategoryName, tagsForCat) + + // Each such tag is a different 'zone' marked in vCenter. + // Query for objects associated with each tag. Consider Host, Cluster and Datacenter kind of objects. + tagToObjects := make(map[string][]mo.Reference) + for _, tag := range tagsForCat { + klog.V(4).Infof("Getting objects associated with tag %s", tag.Name) + objects, err := tagManager.ListAttachedObjects(ctx, tag.Name) + if err != nil { + klog.Errorf("Error fetching objects associated with zone tag %s: %+v", tag.Name, err) + return err + } + tagToObjects[tag.Name] = objects + } + klog.V(4).Infof("Map of tag to objects: %v", tagToObjects) + + // Infer zone for hosts within Datacenter, hosts within clusters and hosts - in this order of increasing priority + // The below nested for-loops goes over all the objects in tagToObjects three times over. + for _, moType := range []string{vclib.DatacenterType, vclib.ClusterComputeResourceType, vclib.HostSystemType} { + for tagName, objects := range tagToObjects { + for _, obj := range objects { + if obj.Reference().Type == moType { + klog.V(4).Infof("Found zone tag %s associated with %s of type %T: %s", tagName, obj, obj, obj.Reference().Value) + switch moType { + case "Datacenter": + // mark that all hosts in this datacenter has tag applied + dcObjRef := object.NewReference(vsi.conn.Client, obj.Reference()) + klog.V(4).Infof("Converted mo obj %v to govmomi object ref %v", obj, dcObjRef) + dcObj, ok := dcObjRef.(*object.Datacenter) + if !ok { + errMsg := fmt.Sprintf("Not able to convert object to Datacenter %v", obj) + klog.Errorf(errMsg) + return errors.New(errMsg) + } + klog.V(4).Infof("Converted to object Datacenter %v", dcObj) + dc := vclib.Datacenter{Datacenter: dcObj} + hosts, err := dc.GetAllHosts(ctx) + if err != nil { + klog.Errorf("Could not get hosts from datacenter %v: %+v", dc, err) + return err + } + for _, host := range hosts { + hostToTag[host] = tagName + } + case "ClusterComputeResource": + // mark that all hosts in this cluster has tag applied + clusterObjRef := object.NewReference(vsi.conn.Client, obj.Reference()) + clusterObj, ok := clusterObjRef.(*object.ClusterComputeResource) + if !ok { + errMsg := fmt.Sprintf("Not able to convert object ClusterComputeResource %v", obj) + klog.Errorf(errMsg) + return errors.New(errMsg) + } + hostSystemList, err := clusterObj.Hosts(ctx) + if err != nil { + klog.Errorf("Not able to get hosts in cluster %v: %+v", clusterObj, err) + return err + } + for _, host := range hostSystemList { + hostToTag[host.Reference()] = tagName + } + case "HostSystem": + // mark that this host has tag applied + hostToTag[obj.Reference()] = tagName + } + } + } + } + } + return nil // no error + }) + if err != nil { + klog.Errorf("Error processing tag category %s: %+v", tagCategoryName, err) + return nil, err + } + klog.V(6).Infof("Computed hostToTag: %v", hostToTag) + return hostToTag, nil + } + + hostToZone, err := getHostsInTagCategory(ctx, vs.cfg.Labels.Zone) + if err != nil { + if err == ErrNoZoneTagInVC { + return zoneToHosts, nil + } + klog.Errorf("Get hosts in tag category %s failed: %+v", vs.cfg.Labels.Zone, err) + return nil, err + } + + hostToRegion, err := getHostsInTagCategory(ctx, vs.cfg.Labels.Region) + if err != nil { + if err == ErrNoZoneTagInVC { + return zoneToHosts, nil + } + klog.Errorf("Get hosts in tag category %s failed: %+v", vs.cfg.Labels.Region, err) + return nil, err + } + + // populate zoneToHosts based on hostToZone and hostToRegion + klog.V(6).Infof("hostToZone: %v", hostToZone) + klog.V(6).Infof("hostToRegion: %v", hostToRegion) + for host, zone := range hostToZone { + region, regionExists := hostToRegion[host] + if !regionExists { + klog.Errorf("Host %s has a zone, but no region. So ignoring.", host) + continue + } + cpZone := cloudprovider.Zone{FailureDomain: zone, Region: region} + hosts, exists := zoneToHosts[cpZone] + if !exists { + hosts = make([]vmwaretypes.ManagedObjectReference, 0) + } + zoneToHosts[cpZone] = append(hosts, host) + } + klog.V(4).Infof("Final zoneToHosts: %v", zoneToHosts) + return zoneToHosts, nil +} + +// returns true if s1 contains all elements from s2; false otherwise +func containsAll(s1 []vmwaretypes.ManagedObjectReference, s2 []vmwaretypes.ManagedObjectReference) bool { + // put all elements of s1 into a map + s1Map := make(map[vmwaretypes.ManagedObjectReference]bool) + for _, mor := range s1 { + s1Map[mor] = true + } + // verify if all elements of s2 are present in s1Map + for _, mor := range s2 { + if _, found := s1Map[mor]; !found { + return false + } + } + return true +} diff --git a/pkg/cloudprovider/providers/vsphere/vsphere_test.go b/pkg/cloudprovider/providers/vsphere/vsphere_test.go index 4332b3cb01..b0805fbc03 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere_test.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere_test.go @@ -23,10 +23,13 @@ import ( "io/ioutil" "log" "os" + "reflect" + "sort" "strconv" "strings" "testing" + "github.com/vmware/govmomi/find" lookup "github.com/vmware/govmomi/lookup/simulator" "github.com/vmware/govmomi/property" "github.com/vmware/govmomi/simulator" @@ -36,6 +39,7 @@ import ( vapi "github.com/vmware/govmomi/vapi/simulator" "github.com/vmware/govmomi/vapi/tags" "github.com/vmware/govmomi/vim25/mo" + vmwaretypes "github.com/vmware/govmomi/vim25/types" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" @@ -487,6 +491,192 @@ func TestZones(t *testing.T) { } } +func TestGetZoneToHosts(t *testing.T) { + // Common setup for all testcases in this test + ctx := context.TODO() + + // Create a vcsim instance + cfg, cleanup := configFromSim() + defer cleanup() + + // Create vSphere configuration object + vs, err := newControllerNode(cfg) + if err != nil { + t.Fatalf("Failed to construct/authenticate vSphere: %s", err) + } + + // Configure region and zone categories + vs.cfg.Labels.Region = "k8s-region" + vs.cfg.Labels.Zone = "k8s-zone" + + // Create vSphere client + vsi, ok := vs.vsphereInstanceMap[cfg.Global.VCenterIP] + if !ok { + t.Fatalf("Couldn't get vSphere instance: %s", cfg.Global.VCenterIP) + } + + err = vsi.conn.Connect(ctx) + if err != nil { + t.Errorf("Failed to connect to vSphere: %s", err) + } + + // Lookup Datacenter for this test's Workspace + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) + if err != nil { + t.Fatal(err) + } + + // Property Collector instance + pc := property.DefaultCollector(vsi.conn.Client) + + // find all hosts in VC + finder := find.NewFinder(vsi.conn.Client, true) + finder.SetDatacenter(dc.Datacenter) + allVcHostsList, err := finder.HostSystemList(ctx, "*") + if err != nil { + t.Fatal(err) + } + var allVcHosts []vmwaretypes.ManagedObjectReference + for _, h := range allVcHostsList { + allVcHosts = append(allVcHosts, h.Reference()) + } + + // choose a cluster to apply zone/region tags + cluster := simulator.Map.Any("ClusterComputeResource") + var c mo.ClusterComputeResource + if err := pc.RetrieveOne(ctx, cluster.Reference(), []string{"host"}, &c); err != nil { + t.Fatal(err) + } + + // choose one of the host inside this cluster to apply zone/region tags + if c.Host == nil || len(c.Host) == 0 { + t.Fatalf("This test needs a host inside a cluster.") + } + clusterHosts := c.Host + sortHosts(clusterHosts) + // pick the first host in the cluster to apply tags + host := clusterHosts[0] + remainingHostsInCluster := clusterHosts[1:] + + // Tag manager instance + m := tags.NewManager(rest.NewClient(vsi.conn.Client)) + + // Create a region category + regionCat, err := m.CreateCategory(ctx, &tags.Category{Name: vs.cfg.Labels.Region}) + if err != nil { + t.Fatal(err) + } + + // Create a region tag + regionName := "k8s-region-US" + regionTag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: regionCat, Name: regionName}) + if err != nil { + t.Fatal(err) + } + + // Create a zone category + zoneCat, err := m.CreateCategory(ctx, &tags.Category{Name: vs.cfg.Labels.Zone}) + if err != nil { + t.Fatal(err) + } + + // Create a zone tag + zone1Name := "k8s-zone-US-CA1" + zone1Tag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: zoneCat, Name: zone1Name}) + if err != nil { + t.Fatal(err) + } + zone1 := cloudprovider.Zone{FailureDomain: zone1Name, Region: regionName} + + // Create a second zone tag + zone2Name := "k8s-zone-US-CA2" + zone2Tag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: zoneCat, Name: zone2Name}) + if err != nil { + t.Fatal(err) + } + zone2 := cloudprovider.Zone{FailureDomain: zone2Name, Region: regionName} + + testcases := []struct { + name string + tags map[string][]mo.Reference + zoneToHosts map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference + }{ + { + name: "Zone and Region tags on host", + tags: map[string][]mo.Reference{zone1Tag: {host}, regionTag: {host}}, + zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}}, + }, + { + name: "Zone on host Region on datacenter", + tags: map[string][]mo.Reference{zone1Tag: {host}, regionTag: {dc}}, + zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}}, + }, + { + name: "Zone on cluster Region on datacenter", + tags: map[string][]mo.Reference{zone1Tag: {cluster}, regionTag: {dc}}, + zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: clusterHosts}, + }, + { + name: "Zone on cluster and override on host", + tags: map[string][]mo.Reference{zone2Tag: {cluster}, zone1Tag: {host}, regionTag: {dc}}, + zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}, zone2: remainingHostsInCluster}, + }, + { + name: "Zone and Region on datacenter", + tags: map[string][]mo.Reference{zone1Tag: {dc}, regionTag: {dc}}, + zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: allVcHosts}, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + // apply tags to datacenter/cluster/host as per this testcase + for tagId, objects := range testcase.tags { + for _, object := range objects { + if err := m.AttachTag(ctx, tagId, object); err != nil { + t.Fatal(err) + } + } + } + + // run the test + zoneToHosts, err := vs.GetZoneToHosts(ctx, vsi) + if err != nil { + t.Errorf("unexpected error when calling GetZoneToHosts: %q", err) + } + + // do not depend on the sort order of hosts in result + sortHostsMap(zoneToHosts) + if !reflect.DeepEqual(zoneToHosts, testcase.zoneToHosts) { + t.Logf("expected result: %+v", testcase.zoneToHosts) + t.Logf("actual result: %+v", zoneToHosts) + t.Error("unexpected result from GetZoneToHosts") + } + + // clean up tags applied on datacenter/cluster/host for this testcase + for tagId, objects := range testcase.tags { + for _, object := range objects { + if err = m.DetachTag(ctx, tagId, object); err != nil { + t.Fatal(err) + } + } + } + }) + } +} + +func sortHostsMap(zoneToHosts map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference) { + for _, hosts := range zoneToHosts { + sortHosts(hosts) + } +} + +func sortHosts(hosts []vmwaretypes.ManagedObjectReference) { + sort.Slice(hosts, func(i, j int) bool { + return hosts[i].Value < hosts[j].Value + }) +} + func TestInstances(t *testing.T) { cfg, ok := configFromEnv() if !ok { diff --git a/pkg/volume/vsphere_volume/vsphere_volume.go b/pkg/volume/vsphere_volume/vsphere_volume.go index c9a96ae90f..cd33af051c 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume.go +++ b/pkg/volume/vsphere_volume/vsphere_volume.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" + volumehelpers "k8s.io/cloud-provider/volume/helpers" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" @@ -411,6 +412,34 @@ func (v *vsphereVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopol pv.Spec.AccessModes = v.plugin.GetAccessModes() } + labels := volSpec.Labels + requirements := make([]v1.NodeSelectorRequirement, 0) + if len(labels) != 0 { + if pv.Labels == nil { + pv.Labels = make(map[string]string) + } + for k, v := range labels { + pv.Labels[k] = v + var values []string + if k == v1.LabelZoneFailureDomain { + values, err = volumehelpers.LabelZonesToList(v) + if err != nil { + return nil, fmt.Errorf("failed to convert label string for Zone: %s to a List: %v", v, err) + } + } else { + values = []string{v} + } + requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values}) + } + } + + if len(requirements) > 0 { + pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity) + pv.Spec.NodeAffinity.Required = new(v1.NodeSelector) + pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1) + pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements + } + return pv, nil } diff --git a/pkg/volume/vsphere_volume/vsphere_volume_util.go b/pkg/volume/vsphere_volume/vsphere_volume_util.go index 921789b651..59d068fe9f 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume_util.go +++ b/pkg/volume/vsphere_volume/vsphere_volume_util.go @@ -72,6 +72,7 @@ type VolumeSpec struct { Fstype string StoragePolicyID string StoragePolicyName string + Labels map[string]string } func verifyDevicePath(path string) (string, error) { @@ -148,12 +149,17 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec if err != nil { return nil, err } + labels, err := cloud.GetVolumeLabels(vmDiskPath) + if err != nil { + return nil, err + } volSpec = &VolumeSpec{ Path: vmDiskPath, Size: volSizeKiB, Fstype: fstype, StoragePolicyName: volumeOptions.StoragePolicyName, StoragePolicyID: volumeOptions.StoragePolicyID, + Labels: labels, } klog.V(2).Infof("Successfully created vsphere volume %s", name) return volSpec, nil diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission.go b/plugin/pkg/admission/storage/persistentvolume/label/admission.go index 7d993f525f..943eaea48c 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission.go @@ -59,6 +59,7 @@ type persistentVolumeLabel struct { gcePVLabeler cloudprovider.PVLabeler azurePVLabeler cloudprovider.PVLabeler openStackPVLabeler cloudprovider.PVLabeler + vspherePVLabeler cloudprovider.PVLabeler } var _ admission.MutationInterface = &persistentVolumeLabel{} @@ -138,7 +139,13 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { } volumeLabels = labels } - + if volume.Spec.VsphereVolume != nil { + labels, err := l.findVsphereVolumeLabels(volume) + if err != nil { + return admission.NewForbidden(a, fmt.Errorf("error querying vSphere Volume %s: %v", volume.Spec.VsphereVolume.VolumePath, err)) + } + volumeLabels = labels + } requirements := make([]api.NodeSelectorRequirement, 0) if len(volumeLabels) != 0 { if volume.Labels == nil { @@ -384,3 +391,48 @@ func (l *persistentVolumeLabel) findCinderDiskLabels(volume *api.PersistentVolum return pvlabler.GetLabelsForVolume(context.TODO(), pv) } + +func (l *persistentVolumeLabel) findVsphereVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) { + pvlabler, err := l.getVspherePVLabeler() + if err != nil { + return nil, err + } + if pvlabler == nil { + return nil, fmt.Errorf("unable to build vSphere cloud provider") + } + + pv := &v1.PersistentVolume{} + err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil) + if err != nil { + return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err) + } + labels, err := pvlabler.GetLabelsForVolume(context.TODO(), pv) + if err != nil { + return nil, err + } + + return labels, nil +} + +func (l *persistentVolumeLabel) getVspherePVLabeler() (cloudprovider.PVLabeler, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.vspherePVLabeler == nil { + var cloudConfigReader io.Reader + if len(l.cloudConfig) > 0 { + cloudConfigReader = bytes.NewReader(l.cloudConfig) + } + cloudProvider, err := cloudprovider.GetCloudProvider("vsphere", cloudConfigReader) + if err != nil || cloudProvider == nil { + return nil, err + } + vspherePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler) + if !ok { + // GetCloudProvider failed + return nil, errors.New("vSphere Cloud Provider does not implement PV labeling") + } + l.vspherePVLabeler = vspherePVLabeler + } + return l.vspherePVLabeler, nil +} diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go b/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go index 588e679ffb..d6e84e1dad 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission_test.go @@ -683,6 +683,72 @@ func Test_PVLAdmission(t *testing.T) { }, err: nil, }, + { + name: "vSphere PV labeled correctly", + handler: newPersistentVolumeLabel(), + pvlabeler: mockVolumeLabels(map[string]string{ + "a": "1", + "b": "2", + v1.LabelZoneFailureDomain: "1__2__3", + }), + preAdmissionPV: &api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vSpherePV", + Namespace: "myns", + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: "123", + }, + }, + }, + }, + postAdmissionPV: &api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vSpherePV", + Namespace: "myns", + Labels: map[string]string{ + "a": "1", + "b": "2", + v1.LabelZoneFailureDomain: "1__2__3", + }, + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: "123", + }, + }, + NodeAffinity: &api.VolumeNodeAffinity{ + Required: &api.NodeSelector{ + NodeSelectorTerms: []api.NodeSelectorTerm{ + { + MatchExpressions: []api.NodeSelectorRequirement{ + { + Key: "a", + Operator: api.NodeSelectorOpIn, + Values: []string{"1"}, + }, + { + Key: "b", + Operator: api.NodeSelectorOpIn, + Values: []string{"2"}, + }, + { + Key: v1.LabelZoneFailureDomain, + Operator: api.NodeSelectorOpIn, + Values: []string{"1", "2", "3"}, + }, + }, + }, + }, + }, + }, + }, + }, + err: nil, + }, } for _, testcase := range testcases { @@ -718,6 +784,7 @@ func setPVLabeler(handler *persistentVolumeLabel, pvlabeler cloudprovider.PVLabe handler.gcePVLabeler = pvlabeler handler.azurePVLabeler = pvlabeler handler.openStackPVLabeler = pvlabeler + handler.vspherePVLabeler = pvlabeler } // sortMatchExpressions sorts a PV's node selector match expressions by key name if it is not nil