mirror of https://github.com/k3s-io/k3s
Merge pull request #72687 from subramanian-neelakantan/vsphere_volume_zone
Applies zone labels to newly created vsphere volumespull/564/head
commit
72b0971f3b
|
@ -21,12 +21,15 @@ go_library(
|
|||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/object:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vapi/rest:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vapi/tags:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vim25:go_default_library",
|
||||
|
@ -58,6 +61,7 @@ go_test(
|
|||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/find:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/lookup/simulator:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/property:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/simulator:go_default_library",
|
||||
|
@ -67,6 +71,7 @@ go_test(
|
|||
"//vendor/github.com/vmware/govmomi/vapi/simulator:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vapi/tags:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library",
|
||||
"//vendor/github.com/vmware/govmomi/vim25/types:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -40,15 +40,18 @@ const (
|
|||
|
||||
// Other Constants
|
||||
const (
|
||||
LogLevel = 4
|
||||
DatastoreProperty = "datastore"
|
||||
ResourcePoolProperty = "resourcePool"
|
||||
DatastoreInfoProperty = "info"
|
||||
VirtualMachineType = "VirtualMachine"
|
||||
RoundTripperDefaultCount = 3
|
||||
VSANDatastoreType = "vsan"
|
||||
DummyVMPrefixName = "vsphere-k8s"
|
||||
ActivePowerState = "poweredOn"
|
||||
LogLevel = 4
|
||||
DatastoreProperty = "datastore"
|
||||
ResourcePoolProperty = "resourcePool"
|
||||
DatastoreInfoProperty = "info"
|
||||
VirtualMachineType = "VirtualMachine"
|
||||
RoundTripperDefaultCount = 3
|
||||
VSANDatastoreType = "vsan"
|
||||
DummyVMPrefixName = "vsphere-k8s"
|
||||
ActivePowerState = "poweredOn"
|
||||
DatacenterType = "Datacenter"
|
||||
ClusterComputeResourceType = "ClusterComputeResource"
|
||||
HostSystemType = "HostSystem"
|
||||
)
|
||||
|
||||
// Test Constants
|
||||
|
|
|
@ -148,6 +148,21 @@ func (dc *Datacenter) GetAllDatastores(ctx context.Context) (map[string]*Datasto
|
|||
return dsURLInfoMap, nil
|
||||
}
|
||||
|
||||
// GetAllHosts returns all the host objects in this datacenter of VC
|
||||
func (dc *Datacenter) GetAllHosts(ctx context.Context) ([]types.ManagedObjectReference, error) {
|
||||
finder := getFinder(dc)
|
||||
hostSystems, err := finder.HostSystemList(ctx, "*")
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get all hostSystems. err: %+v", err)
|
||||
return nil, err
|
||||
}
|
||||
var hostMors []types.ManagedObjectReference
|
||||
for _, hs := range hostSystems {
|
||||
hostMors = append(hostMors, hs.Reference())
|
||||
}
|
||||
return hostMors, nil
|
||||
}
|
||||
|
||||
// GetDatastoreByPath gets the Datastore object from the given vmDiskPath
|
||||
func (dc *Datacenter) GetDatastoreByPath(ctx context.Context, vmDiskPath string) (*Datastore, error) {
|
||||
datastorePathObj := new(object.DatastorePath)
|
||||
|
|
|
@ -85,3 +85,19 @@ func (ds *Datastore) IsCompatibleWithStoragePolicy(ctx context.Context, storageP
|
|||
}
|
||||
return pbmClient.IsDatastoreCompatible(ctx, storagePolicyID, ds)
|
||||
}
|
||||
|
||||
// GetDatastoreHostMounts gets the host names mounted on given datastore
|
||||
func (ds *Datastore) GetDatastoreHostMounts(ctx context.Context) ([]types.ManagedObjectReference, error) {
|
||||
var dsMo mo.Datastore
|
||||
pc := property.DefaultCollector(ds.Client())
|
||||
err := pc.RetrieveOne(ctx, ds.Datastore.Reference(), []string{"host"}, &dsMo)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to retrieve datastore host mount property. err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
hosts := make([]types.ManagedObjectReference, len(dsMo.Host))
|
||||
for _, dsHostMount := range dsMo.Host {
|
||||
hosts = append(hosts, dsHostMount.Key)
|
||||
}
|
||||
return hosts, nil
|
||||
}
|
||||
|
|
|
@ -33,15 +33,19 @@ import (
|
|||
|
||||
"gopkg.in/gcfg.v1"
|
||||
|
||||
"github.com/vmware/govmomi/object"
|
||||
"github.com/vmware/govmomi/vapi/rest"
|
||||
"github.com/vmware/govmomi/vapi/tags"
|
||||
"github.com/vmware/govmomi/vim25/mo"
|
||||
vmwaretypes "github.com/vmware/govmomi/vim25/types"
|
||||
"k8s.io/api/core/v1"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
nodehelpers "k8s.io/cloud-provider/node/helpers"
|
||||
volumehelpers "k8s.io/cloud-provider/volume/helpers"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers"
|
||||
|
@ -66,17 +70,20 @@ var cleanUpDummyVMLock sync.RWMutex
|
|||
const (
|
||||
MissingUsernameErrMsg = "Username is missing"
|
||||
MissingPasswordErrMsg = "Password is missing"
|
||||
NoZoneTagInVCErrMsg = "No zone tags found in vCenter"
|
||||
)
|
||||
|
||||
// Error constants
|
||||
var (
|
||||
ErrUsernameMissing = errors.New(MissingUsernameErrMsg)
|
||||
ErrPasswordMissing = errors.New(MissingPasswordErrMsg)
|
||||
ErrNoZoneTagInVC = errors.New(NoZoneTagInVCErrMsg)
|
||||
)
|
||||
|
||||
var _ cloudprovider.Interface = (*VSphere)(nil)
|
||||
var _ cloudprovider.Instances = (*VSphere)(nil)
|
||||
var _ cloudprovider.Zones = (*VSphere)(nil)
|
||||
var _ cloudprovider.PVLabeler = (*VSphere)(nil)
|
||||
|
||||
// VSphere is an implementation of cloud provider Interface for VSphere.
|
||||
type VSphere struct {
|
||||
|
@ -501,6 +508,7 @@ func buildVSphereFromConfig(cfg VSphereConfig) (*VSphere, error) {
|
|||
if cfg.Global.VCenterPort == "" {
|
||||
cfg.Global.VCenterPort = "443"
|
||||
}
|
||||
|
||||
vsphereInstanceMap, err := populateVsphereInstanceMap(&cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -833,13 +841,17 @@ func (vs *VSphere) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
func (vs *VSphere) isZoneEnabled() bool {
|
||||
return vs.cfg != nil && vs.cfg.Labels.Zone != "" && vs.cfg.Labels.Region != ""
|
||||
}
|
||||
|
||||
// Zones returns an implementation of Zones for vSphere.
|
||||
func (vs *VSphere) Zones() (cloudprovider.Zones, bool) {
|
||||
if vs.cfg == nil {
|
||||
klog.V(1).Info("The vSphere cloud provider does not support zones")
|
||||
return nil, false
|
||||
if vs.isZoneEnabled() {
|
||||
return vs, true
|
||||
}
|
||||
return vs, true
|
||||
klog.V(1).Info("The vSphere cloud provider does not support zones")
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Routes returns a false since the interface is not supported for vSphere.
|
||||
|
@ -1412,14 +1424,10 @@ func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
|||
}
|
||||
|
||||
if zone.Region == "" {
|
||||
if vs.cfg.Labels.Region != "" {
|
||||
return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID)
|
||||
}
|
||||
return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID)
|
||||
}
|
||||
if zone.FailureDomain == "" {
|
||||
if vs.cfg.Labels.Zone != "" {
|
||||
return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID)
|
||||
}
|
||||
return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1438,3 +1446,269 @@ func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.Node
|
|||
func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
|
||||
return cloudprovider.Zone{}, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// GetLabelsForVolume implements the PVLabeler interface for VSphere
|
||||
// since this interface is used by the PV label admission controller.
|
||||
func (vs *VSphere) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
|
||||
// ignore if zones not enabled
|
||||
if !vs.isZoneEnabled() {
|
||||
klog.V(4).Infof("Zone labels for volume is not enabled in vsphere.conf")
|
||||
return nil, nil
|
||||
}
|
||||
// ignore if not vSphere volume
|
||||
if pv.Spec.VsphereVolume == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return vs.GetVolumeLabels(pv.Spec.VsphereVolume.VolumePath)
|
||||
}
|
||||
|
||||
// GetVolumeLabels returns the well known zone and region labels for given volume
|
||||
func (vs *VSphere) GetVolumeLabels(volumePath string) (map[string]string, error) {
|
||||
// Create context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// if zones is not enabled return no labels
|
||||
if !vs.isZoneEnabled() {
|
||||
klog.V(4).Infof("Volume zone labels is not enabled in vsphere.conf")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
datastorePathObj, err := vclib.GetDatastorePathObjFromVMDiskPath(volumePath)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get datastore for volume: %v: %+v", volumePath, err)
|
||||
return nil, err
|
||||
}
|
||||
dsZones, err := vs.GetZonesForDatastore(ctx, datastorePathObj.Datastore)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get zones for datastore %v: %+v", datastorePathObj.Datastore, err)
|
||||
return nil, err
|
||||
}
|
||||
dsZones, err = vs.collapseZonesInRegion(ctx, dsZones)
|
||||
// FIXME: For now, pick the first zone of datastore as the zone of volume
|
||||
labels := make(map[string]string)
|
||||
if len(dsZones) > 0 {
|
||||
labels[v1.LabelZoneRegion] = dsZones[0].Region
|
||||
labels[v1.LabelZoneFailureDomain] = dsZones[0].FailureDomain
|
||||
}
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
// collapse all zones in same region. Join FailureDomain with well known separator
|
||||
func (vs *VSphere) collapseZonesInRegion(ctx context.Context, zones []cloudprovider.Zone) ([]cloudprovider.Zone, error) {
|
||||
// first create a map of region -> list of zones in that region
|
||||
regionToZones := make(map[string][]string)
|
||||
for _, zone := range zones {
|
||||
fds, exists := regionToZones[zone.Region]
|
||||
if !exists {
|
||||
fds = make([]string, 0)
|
||||
}
|
||||
regionToZones[zone.Region] = append(fds, zone.FailureDomain)
|
||||
}
|
||||
|
||||
// Join all fds in same region and return Zone instances
|
||||
collapsedZones := make([]cloudprovider.Zone, 0)
|
||||
for region, fds := range regionToZones {
|
||||
fdSet := sets.NewString(fds...)
|
||||
appendedZone := volumehelpers.ZonesSetToLabelValue(fdSet)
|
||||
collapsedZones = append(collapsedZones, cloudprovider.Zone{FailureDomain: appendedZone, Region: region})
|
||||
}
|
||||
return collapsedZones, nil
|
||||
}
|
||||
|
||||
// GetZonesForDatastore returns all the zones from which this datastore is visible
|
||||
func (vs *VSphere) GetZonesForDatastore(ctx context.Context, datastore string) ([]cloudprovider.Zone, error) {
|
||||
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get vSphere instance: %+v", err)
|
||||
return nil, err
|
||||
}
|
||||
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get datacenter: %+v", err)
|
||||
return nil, err
|
||||
}
|
||||
// get the hosts mounted on this datastore
|
||||
// datastore -> ["host-1", "host-2", "host-3", ...]
|
||||
ds, err := dc.GetDatastoreByName(ctx, datastore)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get datastore by name: %v: %+v", datastore, err)
|
||||
return nil, err
|
||||
}
|
||||
dsHosts, err := ds.GetDatastoreHostMounts(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get datastore host mounts for %v: %+v", datastore, err)
|
||||
return nil, err
|
||||
}
|
||||
klog.V(4).Infof("Got host mounts for datastore: %v: %v", datastore, dsHosts)
|
||||
|
||||
// compute map of zone to list of hosts in that zone across all hosts in vsphere
|
||||
// zone -> ["host-i", "host-j", "host-k", ...]
|
||||
zoneToHosts, err := vs.GetZoneToHosts(ctx, vsi)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get zones for hosts: %+v", err)
|
||||
return nil, err
|
||||
}
|
||||
klog.V(4).Infof("Got zone to hosts: %v", zoneToHosts)
|
||||
|
||||
// datastore belongs to a zone if all hosts in that zone mount that datastore
|
||||
dsZones := make([]cloudprovider.Zone, 0)
|
||||
for zone, zoneHosts := range zoneToHosts {
|
||||
// if zone is valid and zoneHosts is a subset of dsHosts, then add zone
|
||||
if zone.Region != "" && containsAll(dsHosts, zoneHosts) {
|
||||
dsZones = append(dsZones, zone)
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("Datastore %s belongs to zones: %v", datastore, dsZones)
|
||||
return dsZones, nil
|
||||
}
|
||||
|
||||
// GetZoneToHosts returns a map of 'zone' -> 'list of hosts in that zone' in given VC
|
||||
func (vs *VSphere) GetZoneToHosts(ctx context.Context, vsi *VSphereInstance) (map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference, error) {
|
||||
// Approach is to find tags with the category of 'vs.cfg.Labels.Zone'
|
||||
zoneToHosts := make(map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference)
|
||||
|
||||
getHostsInTagCategory := func(ctx context.Context, tagCategoryName string) (map[vmwaretypes.ManagedObjectReference]string, error) {
|
||||
|
||||
hostToTag := make(map[vmwaretypes.ManagedObjectReference]string)
|
||||
err := withTagsClient(ctx, vsi.conn, func(c *rest.Client) error {
|
||||
// Look whether the zone/region tag is defined in VC
|
||||
tagManager := tags.NewManager(c)
|
||||
tagsForCat, err := tagManager.GetTagsForCategory(ctx, tagCategoryName)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("No tags with category %s exists in VC. So ignoring.", tagCategoryName)
|
||||
// return a special error so that tag unavailability can be ignored
|
||||
return ErrNoZoneTagInVC
|
||||
}
|
||||
klog.V(4).Infof("List of tags under category %s: %v", tagCategoryName, tagsForCat)
|
||||
|
||||
// Each such tag is a different 'zone' marked in vCenter.
|
||||
// Query for objects associated with each tag. Consider Host, Cluster and Datacenter kind of objects.
|
||||
tagToObjects := make(map[string][]mo.Reference)
|
||||
for _, tag := range tagsForCat {
|
||||
klog.V(4).Infof("Getting objects associated with tag %s", tag.Name)
|
||||
objects, err := tagManager.ListAttachedObjects(ctx, tag.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Error fetching objects associated with zone tag %s: %+v", tag.Name, err)
|
||||
return err
|
||||
}
|
||||
tagToObjects[tag.Name] = objects
|
||||
}
|
||||
klog.V(4).Infof("Map of tag to objects: %v", tagToObjects)
|
||||
|
||||
// Infer zone for hosts within Datacenter, hosts within clusters and hosts - in this order of increasing priority
|
||||
// The below nested for-loops goes over all the objects in tagToObjects three times over.
|
||||
for _, moType := range []string{vclib.DatacenterType, vclib.ClusterComputeResourceType, vclib.HostSystemType} {
|
||||
for tagName, objects := range tagToObjects {
|
||||
for _, obj := range objects {
|
||||
if obj.Reference().Type == moType {
|
||||
klog.V(4).Infof("Found zone tag %s associated with %s of type %T: %s", tagName, obj, obj, obj.Reference().Value)
|
||||
switch moType {
|
||||
case "Datacenter":
|
||||
// mark that all hosts in this datacenter has tag applied
|
||||
dcObjRef := object.NewReference(vsi.conn.Client, obj.Reference())
|
||||
klog.V(4).Infof("Converted mo obj %v to govmomi object ref %v", obj, dcObjRef)
|
||||
dcObj, ok := dcObjRef.(*object.Datacenter)
|
||||
if !ok {
|
||||
errMsg := fmt.Sprintf("Not able to convert object to Datacenter %v", obj)
|
||||
klog.Errorf(errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
klog.V(4).Infof("Converted to object Datacenter %v", dcObj)
|
||||
dc := vclib.Datacenter{Datacenter: dcObj}
|
||||
hosts, err := dc.GetAllHosts(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Could not get hosts from datacenter %v: %+v", dc, err)
|
||||
return err
|
||||
}
|
||||
for _, host := range hosts {
|
||||
hostToTag[host] = tagName
|
||||
}
|
||||
case "ClusterComputeResource":
|
||||
// mark that all hosts in this cluster has tag applied
|
||||
clusterObjRef := object.NewReference(vsi.conn.Client, obj.Reference())
|
||||
clusterObj, ok := clusterObjRef.(*object.ClusterComputeResource)
|
||||
if !ok {
|
||||
errMsg := fmt.Sprintf("Not able to convert object ClusterComputeResource %v", obj)
|
||||
klog.Errorf(errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
hostSystemList, err := clusterObj.Hosts(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Not able to get hosts in cluster %v: %+v", clusterObj, err)
|
||||
return err
|
||||
}
|
||||
for _, host := range hostSystemList {
|
||||
hostToTag[host.Reference()] = tagName
|
||||
}
|
||||
case "HostSystem":
|
||||
// mark that this host has tag applied
|
||||
hostToTag[obj.Reference()] = tagName
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil // no error
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Error processing tag category %s: %+v", tagCategoryName, err)
|
||||
return nil, err
|
||||
}
|
||||
klog.V(6).Infof("Computed hostToTag: %v", hostToTag)
|
||||
return hostToTag, nil
|
||||
}
|
||||
|
||||
hostToZone, err := getHostsInTagCategory(ctx, vs.cfg.Labels.Zone)
|
||||
if err != nil {
|
||||
if err == ErrNoZoneTagInVC {
|
||||
return zoneToHosts, nil
|
||||
}
|
||||
klog.Errorf("Get hosts in tag category %s failed: %+v", vs.cfg.Labels.Zone, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hostToRegion, err := getHostsInTagCategory(ctx, vs.cfg.Labels.Region)
|
||||
if err != nil {
|
||||
if err == ErrNoZoneTagInVC {
|
||||
return zoneToHosts, nil
|
||||
}
|
||||
klog.Errorf("Get hosts in tag category %s failed: %+v", vs.cfg.Labels.Region, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// populate zoneToHosts based on hostToZone and hostToRegion
|
||||
klog.V(6).Infof("hostToZone: %v", hostToZone)
|
||||
klog.V(6).Infof("hostToRegion: %v", hostToRegion)
|
||||
for host, zone := range hostToZone {
|
||||
region, regionExists := hostToRegion[host]
|
||||
if !regionExists {
|
||||
klog.Errorf("Host %s has a zone, but no region. So ignoring.", host)
|
||||
continue
|
||||
}
|
||||
cpZone := cloudprovider.Zone{FailureDomain: zone, Region: region}
|
||||
hosts, exists := zoneToHosts[cpZone]
|
||||
if !exists {
|
||||
hosts = make([]vmwaretypes.ManagedObjectReference, 0)
|
||||
}
|
||||
zoneToHosts[cpZone] = append(hosts, host)
|
||||
}
|
||||
klog.V(4).Infof("Final zoneToHosts: %v", zoneToHosts)
|
||||
return zoneToHosts, nil
|
||||
}
|
||||
|
||||
// returns true if s1 contains all elements from s2; false otherwise
|
||||
func containsAll(s1 []vmwaretypes.ManagedObjectReference, s2 []vmwaretypes.ManagedObjectReference) bool {
|
||||
// put all elements of s1 into a map
|
||||
s1Map := make(map[vmwaretypes.ManagedObjectReference]bool)
|
||||
for _, mor := range s1 {
|
||||
s1Map[mor] = true
|
||||
}
|
||||
// verify if all elements of s2 are present in s1Map
|
||||
for _, mor := range s2 {
|
||||
if _, found := s1Map[mor]; !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -23,10 +23,13 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/vmware/govmomi/find"
|
||||
lookup "github.com/vmware/govmomi/lookup/simulator"
|
||||
"github.com/vmware/govmomi/property"
|
||||
"github.com/vmware/govmomi/simulator"
|
||||
|
@ -36,6 +39,7 @@ import (
|
|||
vapi "github.com/vmware/govmomi/vapi/simulator"
|
||||
"github.com/vmware/govmomi/vapi/tags"
|
||||
"github.com/vmware/govmomi/vim25/mo"
|
||||
vmwaretypes "github.com/vmware/govmomi/vim25/types"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
|
@ -487,6 +491,192 @@ func TestZones(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetZoneToHosts(t *testing.T) {
|
||||
// Common setup for all testcases in this test
|
||||
ctx := context.TODO()
|
||||
|
||||
// Create a vcsim instance
|
||||
cfg, cleanup := configFromSim()
|
||||
defer cleanup()
|
||||
|
||||
// Create vSphere configuration object
|
||||
vs, err := newControllerNode(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
|
||||
}
|
||||
|
||||
// Configure region and zone categories
|
||||
vs.cfg.Labels.Region = "k8s-region"
|
||||
vs.cfg.Labels.Zone = "k8s-zone"
|
||||
|
||||
// Create vSphere client
|
||||
vsi, ok := vs.vsphereInstanceMap[cfg.Global.VCenterIP]
|
||||
if !ok {
|
||||
t.Fatalf("Couldn't get vSphere instance: %s", cfg.Global.VCenterIP)
|
||||
}
|
||||
|
||||
err = vsi.conn.Connect(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect to vSphere: %s", err)
|
||||
}
|
||||
|
||||
// Lookup Datacenter for this test's Workspace
|
||||
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Property Collector instance
|
||||
pc := property.DefaultCollector(vsi.conn.Client)
|
||||
|
||||
// find all hosts in VC
|
||||
finder := find.NewFinder(vsi.conn.Client, true)
|
||||
finder.SetDatacenter(dc.Datacenter)
|
||||
allVcHostsList, err := finder.HostSystemList(ctx, "*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var allVcHosts []vmwaretypes.ManagedObjectReference
|
||||
for _, h := range allVcHostsList {
|
||||
allVcHosts = append(allVcHosts, h.Reference())
|
||||
}
|
||||
|
||||
// choose a cluster to apply zone/region tags
|
||||
cluster := simulator.Map.Any("ClusterComputeResource")
|
||||
var c mo.ClusterComputeResource
|
||||
if err := pc.RetrieveOne(ctx, cluster.Reference(), []string{"host"}, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// choose one of the host inside this cluster to apply zone/region tags
|
||||
if c.Host == nil || len(c.Host) == 0 {
|
||||
t.Fatalf("This test needs a host inside a cluster.")
|
||||
}
|
||||
clusterHosts := c.Host
|
||||
sortHosts(clusterHosts)
|
||||
// pick the first host in the cluster to apply tags
|
||||
host := clusterHosts[0]
|
||||
remainingHostsInCluster := clusterHosts[1:]
|
||||
|
||||
// Tag manager instance
|
||||
m := tags.NewManager(rest.NewClient(vsi.conn.Client))
|
||||
|
||||
// Create a region category
|
||||
regionCat, err := m.CreateCategory(ctx, &tags.Category{Name: vs.cfg.Labels.Region})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a region tag
|
||||
regionName := "k8s-region-US"
|
||||
regionTag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: regionCat, Name: regionName})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a zone category
|
||||
zoneCat, err := m.CreateCategory(ctx, &tags.Category{Name: vs.cfg.Labels.Zone})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a zone tag
|
||||
zone1Name := "k8s-zone-US-CA1"
|
||||
zone1Tag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: zoneCat, Name: zone1Name})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
zone1 := cloudprovider.Zone{FailureDomain: zone1Name, Region: regionName}
|
||||
|
||||
// Create a second zone tag
|
||||
zone2Name := "k8s-zone-US-CA2"
|
||||
zone2Tag, err := m.CreateTag(ctx, &tags.Tag{CategoryID: zoneCat, Name: zone2Name})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
zone2 := cloudprovider.Zone{FailureDomain: zone2Name, Region: regionName}
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
tags map[string][]mo.Reference
|
||||
zoneToHosts map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference
|
||||
}{
|
||||
{
|
||||
name: "Zone and Region tags on host",
|
||||
tags: map[string][]mo.Reference{zone1Tag: {host}, regionTag: {host}},
|
||||
zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}},
|
||||
},
|
||||
{
|
||||
name: "Zone on host Region on datacenter",
|
||||
tags: map[string][]mo.Reference{zone1Tag: {host}, regionTag: {dc}},
|
||||
zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}},
|
||||
},
|
||||
{
|
||||
name: "Zone on cluster Region on datacenter",
|
||||
tags: map[string][]mo.Reference{zone1Tag: {cluster}, regionTag: {dc}},
|
||||
zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: clusterHosts},
|
||||
},
|
||||
{
|
||||
name: "Zone on cluster and override on host",
|
||||
tags: map[string][]mo.Reference{zone2Tag: {cluster}, zone1Tag: {host}, regionTag: {dc}},
|
||||
zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: {host}, zone2: remainingHostsInCluster},
|
||||
},
|
||||
{
|
||||
name: "Zone and Region on datacenter",
|
||||
tags: map[string][]mo.Reference{zone1Tag: {dc}, regionTag: {dc}},
|
||||
zoneToHosts: map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference{zone1: allVcHosts},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testcase := range testcases {
|
||||
t.Run(testcase.name, func(t *testing.T) {
|
||||
// apply tags to datacenter/cluster/host as per this testcase
|
||||
for tagId, objects := range testcase.tags {
|
||||
for _, object := range objects {
|
||||
if err := m.AttachTag(ctx, tagId, object); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run the test
|
||||
zoneToHosts, err := vs.GetZoneToHosts(ctx, vsi)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error when calling GetZoneToHosts: %q", err)
|
||||
}
|
||||
|
||||
// do not depend on the sort order of hosts in result
|
||||
sortHostsMap(zoneToHosts)
|
||||
if !reflect.DeepEqual(zoneToHosts, testcase.zoneToHosts) {
|
||||
t.Logf("expected result: %+v", testcase.zoneToHosts)
|
||||
t.Logf("actual result: %+v", zoneToHosts)
|
||||
t.Error("unexpected result from GetZoneToHosts")
|
||||
}
|
||||
|
||||
// clean up tags applied on datacenter/cluster/host for this testcase
|
||||
for tagId, objects := range testcase.tags {
|
||||
for _, object := range objects {
|
||||
if err = m.DetachTag(ctx, tagId, object); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func sortHostsMap(zoneToHosts map[cloudprovider.Zone][]vmwaretypes.ManagedObjectReference) {
|
||||
for _, hosts := range zoneToHosts {
|
||||
sortHosts(hosts)
|
||||
}
|
||||
}
|
||||
|
||||
func sortHosts(hosts []vmwaretypes.ManagedObjectReference) {
|
||||
sort.Slice(hosts, func(i, j int) bool {
|
||||
return hosts[i].Value < hosts[j].Value
|
||||
})
|
||||
}
|
||||
|
||||
func TestInstances(t *testing.T) {
|
||||
cfg, ok := configFromEnv()
|
||||
if !ok {
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
volumehelpers "k8s.io/cloud-provider/volume/helpers"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
@ -411,6 +412,34 @@ func (v *vsphereVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopol
|
|||
pv.Spec.AccessModes = v.plugin.GetAccessModes()
|
||||
}
|
||||
|
||||
labels := volSpec.Labels
|
||||
requirements := make([]v1.NodeSelectorRequirement, 0)
|
||||
if len(labels) != 0 {
|
||||
if pv.Labels == nil {
|
||||
pv.Labels = make(map[string]string)
|
||||
}
|
||||
for k, v := range labels {
|
||||
pv.Labels[k] = v
|
||||
var values []string
|
||||
if k == v1.LabelZoneFailureDomain {
|
||||
values, err = volumehelpers.LabelZonesToList(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a List: %v", v, err)
|
||||
}
|
||||
} else {
|
||||
values = []string{v}
|
||||
}
|
||||
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
|
||||
}
|
||||
}
|
||||
|
||||
if len(requirements) > 0 {
|
||||
pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
|
||||
pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
|
||||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
|
||||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements
|
||||
}
|
||||
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -72,6 +72,7 @@ type VolumeSpec struct {
|
|||
Fstype string
|
||||
StoragePolicyID string
|
||||
StoragePolicyName string
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
func verifyDevicePath(path string) (string, error) {
|
||||
|
@ -148,12 +149,17 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
labels, err := cloud.GetVolumeLabels(vmDiskPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volSpec = &VolumeSpec{
|
||||
Path: vmDiskPath,
|
||||
Size: volSizeKiB,
|
||||
Fstype: fstype,
|
||||
StoragePolicyName: volumeOptions.StoragePolicyName,
|
||||
StoragePolicyID: volumeOptions.StoragePolicyID,
|
||||
Labels: labels,
|
||||
}
|
||||
klog.V(2).Infof("Successfully created vsphere volume %s", name)
|
||||
return volSpec, nil
|
||||
|
|
|
@ -59,6 +59,7 @@ type persistentVolumeLabel struct {
|
|||
gcePVLabeler cloudprovider.PVLabeler
|
||||
azurePVLabeler cloudprovider.PVLabeler
|
||||
openStackPVLabeler cloudprovider.PVLabeler
|
||||
vspherePVLabeler cloudprovider.PVLabeler
|
||||
}
|
||||
|
||||
var _ admission.MutationInterface = &persistentVolumeLabel{}
|
||||
|
@ -138,7 +139,13 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
|
|||
}
|
||||
volumeLabels = labels
|
||||
}
|
||||
|
||||
if volume.Spec.VsphereVolume != nil {
|
||||
labels, err := l.findVsphereVolumeLabels(volume)
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, fmt.Errorf("error querying vSphere Volume %s: %v", volume.Spec.VsphereVolume.VolumePath, err))
|
||||
}
|
||||
volumeLabels = labels
|
||||
}
|
||||
requirements := make([]api.NodeSelectorRequirement, 0)
|
||||
if len(volumeLabels) != 0 {
|
||||
if volume.Labels == nil {
|
||||
|
@ -384,3 +391,48 @@ func (l *persistentVolumeLabel) findCinderDiskLabels(volume *api.PersistentVolum
|
|||
return pvlabler.GetLabelsForVolume(context.TODO(), pv)
|
||||
|
||||
}
|
||||
|
||||
func (l *persistentVolumeLabel) findVsphereVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
|
||||
pvlabler, err := l.getVspherePVLabeler()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pvlabler == nil {
|
||||
return nil, fmt.Errorf("unable to build vSphere cloud provider")
|
||||
}
|
||||
|
||||
pv := &v1.PersistentVolume{}
|
||||
err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
|
||||
}
|
||||
labels, err := pvlabler.GetLabelsForVolume(context.TODO(), pv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
func (l *persistentVolumeLabel) getVspherePVLabeler() (cloudprovider.PVLabeler, error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
if l.vspherePVLabeler == nil {
|
||||
var cloudConfigReader io.Reader
|
||||
if len(l.cloudConfig) > 0 {
|
||||
cloudConfigReader = bytes.NewReader(l.cloudConfig)
|
||||
}
|
||||
cloudProvider, err := cloudprovider.GetCloudProvider("vsphere", cloudConfigReader)
|
||||
if err != nil || cloudProvider == nil {
|
||||
return nil, err
|
||||
}
|
||||
vspherePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
|
||||
if !ok {
|
||||
// GetCloudProvider failed
|
||||
return nil, errors.New("vSphere Cloud Provider does not implement PV labeling")
|
||||
}
|
||||
l.vspherePVLabeler = vspherePVLabeler
|
||||
}
|
||||
return l.vspherePVLabeler, nil
|
||||
}
|
||||
|
|
|
@ -683,6 +683,72 @@ func Test_PVLAdmission(t *testing.T) {
|
|||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "vSphere PV labeled correctly",
|
||||
handler: newPersistentVolumeLabel(),
|
||||
pvlabeler: mockVolumeLabels(map[string]string{
|
||||
"a": "1",
|
||||
"b": "2",
|
||||
v1.LabelZoneFailureDomain: "1__2__3",
|
||||
}),
|
||||
preAdmissionPV: &api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "vSpherePV",
|
||||
Namespace: "myns",
|
||||
},
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
|
||||
VolumePath: "123",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
postAdmissionPV: &api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "vSpherePV",
|
||||
Namespace: "myns",
|
||||
Labels: map[string]string{
|
||||
"a": "1",
|
||||
"b": "2",
|
||||
v1.LabelZoneFailureDomain: "1__2__3",
|
||||
},
|
||||
},
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
|
||||
VolumePath: "123",
|
||||
},
|
||||
},
|
||||
NodeAffinity: &api.VolumeNodeAffinity{
|
||||
Required: &api.NodeSelector{
|
||||
NodeSelectorTerms: []api.NodeSelectorTerm{
|
||||
{
|
||||
MatchExpressions: []api.NodeSelectorRequirement{
|
||||
{
|
||||
Key: "a",
|
||||
Operator: api.NodeSelectorOpIn,
|
||||
Values: []string{"1"},
|
||||
},
|
||||
{
|
||||
Key: "b",
|
||||
Operator: api.NodeSelectorOpIn,
|
||||
Values: []string{"2"},
|
||||
},
|
||||
{
|
||||
Key: v1.LabelZoneFailureDomain,
|
||||
Operator: api.NodeSelectorOpIn,
|
||||
Values: []string{"1", "2", "3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testcase := range testcases {
|
||||
|
@ -718,6 +784,7 @@ func setPVLabeler(handler *persistentVolumeLabel, pvlabeler cloudprovider.PVLabe
|
|||
handler.gcePVLabeler = pvlabeler
|
||||
handler.azurePVLabeler = pvlabeler
|
||||
handler.openStackPVLabeler = pvlabeler
|
||||
handler.vspherePVLabeler = pvlabeler
|
||||
}
|
||||
|
||||
// sortMatchExpressions sorts a PV's node selector match expressions by key name if it is not nil
|
||||
|
|
Loading…
Reference in New Issue