From e5aec8645d740123dfc8f1400b66bca05673b833 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Mon, 11 Sep 2017 20:20:29 -0700 Subject: [PATCH] Changed GetAllZones to only get zones with nodes that are currently running (renamed to GetAllCurrentZones). Added E2E test to confirm this behavior. Added node informer to cloud-provider controller to keep track of zones with k8s nodes in them. --- .../app/controllermanager.go | 4 + pkg/cloudprovider/BUILD | 1 + pkg/cloudprovider/cloud.go | 6 + pkg/cloudprovider/providers/gce/BUILD | 1 + pkg/cloudprovider/providers/gce/gce.go | 99 ++++++++-- pkg/cloudprovider/providers/gce/gce_disks.go | 42 ++--- .../providers/gce/gce_disks_test.go | 171 ++++++++++++------ .../providers/gce/gce_instances.go | 54 ++++-- pkg/volume/gce_pd/gce_util.go | 2 +- test/e2e/e2e.go | 3 +- test/e2e/multicluster/BUILD | 2 + test/e2e/multicluster/ubernetes_lite.go | 144 +++++++++++++++ test/e2e/storage/volume_provisioning.go | 4 +- 13 files changed, 421 insertions(+), 112 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c53be756e4..9add80e01a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -442,6 +442,10 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild } } + if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok { + informerUserCloud.SetInformers(sharedInformers) + } + ctx := ControllerContext{ ClientBuilder: clientBuilder, InformerFactory: sharedInformers, diff --git a/pkg/cloudprovider/BUILD b/pkg/cloudprovider/BUILD index 9f78083358..7c38feae3f 100644 --- a/pkg/cloudprovider/BUILD +++ b/pkg/cloudprovider/BUILD @@ -18,6 +18,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", ], ) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 00479e9c5a..2e46d47d33 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" "k8s.io/kubernetes/pkg/controller" ) @@ -49,6 +50,11 @@ type Interface interface { HasClusterID() bool } +type InformerUser interface { + // SetInformers sets the informer on the cloud object. + SetInformers(informerFactory informers.SharedInformerFactory) +} + // Clusters is an abstract, pluggable interface for clusters of containers. type Clusters interface { // ListClusters lists the names of the available clusters. diff --git a/pkg/cloudprovider/providers/gce/BUILD b/pkg/cloudprovider/providers/gce/BUILD index d68e1a8f8e..bcd8c5de9d 100644 --- a/pkg/cloudprovider/providers/gce/BUILD +++ b/pkg/cloudprovider/providers/gce/BUILD @@ -75,6 +75,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index f86865c4db..daab7f6719 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -34,13 +34,16 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/version" "github.com/golang/glog" @@ -99,18 +102,21 @@ type GCECloud struct { // for the cloudprovider to start watching the configmap. ClusterID ClusterID - service *compute.Service - serviceBeta *computebeta.Service - serviceAlpha *computealpha.Service - containerService *container.Service - client clientset.Interface - clientBuilder controller.ControllerClientBuilder - eventBroadcaster record.EventBroadcaster - eventRecorder record.EventRecorder - projectID string - region string - localZone string // The zone in which we are running - managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master) + service *compute.Service + serviceBeta *computebeta.Service + serviceAlpha *computealpha.Service + containerService *container.Service + client clientset.Interface + clientBuilder controller.ControllerClientBuilder + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + projectID string + region string + localZone string // The zone in which we are running + // managedZones will be set to the 1 zone if running a single zone cluster + // it will be set to ALL zones in region for any multi-zone cluster + // Use GetAllCurrentZones to get only zones that contain nodes + managedZones []string networkURL string isLegacyNetwork bool subnetworkURL string @@ -125,6 +131,12 @@ type GCECloud struct { useMetadataServer bool operationPollRateLimiter flowcontrol.RateLimiter manager diskServiceManager + // Lock for access to nodeZones + nodeZonesLock sync.Mutex + // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone + // it is updated by the nodeInformer + nodeZones map[string]sets.String + nodeInformerSynced cache.InformerSynced // sharedResourceLock is used to serialize GCE operations that may mutate shared state to // prevent inconsistencies. For example, load balancers manipulation methods will take the // lock to prevent shared resources from being prematurely deleted while the operation is @@ -470,6 +482,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) { useMetadataServer: config.UseMetadataServer, operationPollRateLimiter: operationPollRateLimiter, AlphaFeatureGate: config.AlphaFeatureGate, + nodeZones: map[string]sets.String{}, } gce.manager = &gceServiceManager{gce} @@ -582,6 +595,68 @@ func (gce *GCECloud) IsLegacyNetwork() bool { return gce.isLegacyNetwork } +func (gce *GCECloud) SetInformers(informerFactory informers.SharedInformerFactory) { + glog.Infof("Setting up informers for GCECloud") + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + gce.updateNodeZones(nil, node) + }, + UpdateFunc: func(prev, obj interface{}) { + prevNode := prev.(*v1.Node) + newNode := obj.(*v1.Node) + if newNode.Labels[kubeletapis.LabelZoneFailureDomain] == + prevNode.Labels[kubeletapis.LabelZoneFailureDomain] { + return + } + gce.updateNodeZones(prevNode, newNode) + }, + DeleteFunc: func(obj interface{}) { + node, isNode := obj.(*v1.Node) + // We can get DeletedFinalStateUnknown instead of *v1.Node here + // and we need to handle that correctly. + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + node, ok = deletedState.Obj.(*v1.Node) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + gce.updateNodeZones(node, nil) + }, + }) + gce.nodeInformerSynced = nodeInformer.HasSynced +} + +func (gce *GCECloud) updateNodeZones(prevNode, newNode *v1.Node) { + gce.nodeZonesLock.Lock() + defer gce.nodeZonesLock.Unlock() + if prevNode != nil { + prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok { + gce.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name) + if gce.nodeZones[prevZone].Len() == 0 { + gce.nodeZones[prevZone] = nil + } + } + } + if newNode != nil { + newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok { + if gce.nodeZones[newZone] == nil { + gce.nodeZones[newZone] = sets.NewString() + } + gce.nodeZones[newZone].Insert(newNode.ObjectMeta.Name) + } + } +} + // Known-useless DNS search path. var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`) diff --git a/pkg/cloudprovider/providers/gce/gce_disks.go b/pkg/cloudprovider/providers/gce/gce_disks.go index 48dfd18bad..1e6615062b 100644 --- a/pkg/cloudprovider/providers/gce/gce_disks.go +++ b/pkg/cloudprovider/providers/gce/gce_disks.go @@ -690,11 +690,14 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam // JSON in Description field. func (gce *GCECloud) CreateDisk( name string, diskType string, zone string, sizeGb int64, tags map[string]string) error { - - // Do not allow creation of PDs in zones that are not managed. Such PDs - // then cannot be deleted by DeleteDisk. - if isManaged := gce.verifyZoneIsManaged(zone); !isManaged { - return fmt.Errorf("kubernetes does not manage zone %q", zone) + // Do not allow creation of PDs in zones that are do not have nodes. Such PDs + // are not currently usable. + curZones, err := gce.GetAllCurrentZones() + if err != nil { + return err + } + if !curZones.Has(zone) { + return fmt.Errorf("kubernetes does not have a node in zone %q", zone) } tagsStr, err := gce.encodeDiskTags(tags) @@ -733,17 +736,16 @@ func (gce *GCECloud) CreateDisk( func (gce *GCECloud) CreateRegionalDisk( name string, diskType string, replicaZones sets.String, sizeGb int64, tags map[string]string) error { - // Do not allow creation of PDs in zones that are not managed. Such PDs - // then cannot be deleted by DeleteDisk. - unmanagedZones := []string{} - for _, zone := range replicaZones.UnsortedList() { - if isManaged := gce.verifyZoneIsManaged(zone); !isManaged { - unmanagedZones = append(unmanagedZones, zone) - } + // Do not allow creation of PDs in zones that are do not have nodes. Such PDs + // are not currently usable. This functionality should be reverted to checking + // against managed zones if we want users to be able to create RegionalDisks + // in zones where there are no nodes + curZones, err := gce.GetAllCurrentZones() + if err != nil { + return err } - - if len(unmanagedZones) > 0 { - return fmt.Errorf("kubernetes does not manage specified zones: %q. Managed Zones: %q", unmanagedZones, gce.managedZones) + if !curZones.IsSuperset(replicaZones) { + return fmt.Errorf("kubernetes does not have nodes in specified zones: %q. Zones that contain nodes: %q", replicaZones.Difference(curZones), curZones) } tagsStr, err := gce.encodeDiskTags(tags) @@ -776,16 +778,6 @@ func (gce *GCECloud) CreateRegionalDisk( return err } -func (gce *GCECloud) verifyZoneIsManaged(zone string) bool { - for _, managedZone := range gce.managedZones { - if zone == managedZone { - return true - } - } - - return false -} - func getDiskType(diskType string) (string, error) { switch diskType { case DiskTypeSSD, DiskTypeStandard: diff --git a/pkg/cloudprovider/providers/gce/gce_disks_test.go b/pkg/cloudprovider/providers/gce/gce_disks_test.go index 66a13e2ee8..3c6b5ee4f7 100644 --- a/pkg/cloudprovider/providers/gce/gce_disks_test.go +++ b/pkg/cloudprovider/providers/gce/gce_disks_test.go @@ -37,16 +37,19 @@ func TestCreateDisk_Basic(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - projectID: gceProjectId, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: []string{"zone1"}, + projectID: gceProjectId, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" @@ -95,16 +98,20 @@ func TestCreateRegionalDisk_Basic(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1", "zone3", "zone2"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{AlphaFeatureGCEDisk}) + if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1", "zone3", "zone2"}, - projectID: gceProjectId, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + projectID: gceProjectId, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" @@ -153,15 +160,18 @@ func TestCreateDisk_DiskAlreadyExists(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } // Inject disk AlreadyExists error. @@ -184,8 +194,13 @@ func TestCreateDisk_WrongZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) - gce := GCECloud{manager: fakeManager, managedZones: []string{"zone1"}} + gce := GCECloud{ + manager: fakeManager, + managedZones: zonesWithNodes, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }} diskName := "disk" diskType := DiskTypeSSD @@ -204,8 +219,13 @@ func TestCreateDisk_NoManagedZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{} fakeManager := newFakeManager(gceProjectId, gceRegion) - gce := GCECloud{manager: fakeManager, managedZones: []string{}} + gce := GCECloud{ + manager: fakeManager, + managedZones: zonesWithNodes, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }} diskName := "disk" diskType := DiskTypeSSD @@ -224,8 +244,12 @@ func TestCreateDisk_BadDiskType(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) - gce := GCECloud{manager: fakeManager, managedZones: []string{"zone1"}} + gce := GCECloud{manager: fakeManager, + managedZones: zonesWithNodes, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }} diskName := "disk" diskType := "arbitrary-disk" @@ -245,15 +269,18 @@ func TestCreateDisk_MultiZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1", "zone2", "zone3"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1", "zone2", "zone3"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" @@ -274,15 +301,18 @@ func TestDeleteDisk_Basic(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" diskType := DiskTypeSSD @@ -311,15 +341,18 @@ func TestDeleteDisk_NotFound(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" @@ -336,15 +369,18 @@ func TestDeleteDisk_ResourceBeingUsed(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" diskType := DiskTypeSSD @@ -367,15 +403,18 @@ func TestDeleteDisk_SameDiskMultiZone(t *testing.T) { /* Assert */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1", "zone2", "zone3"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1", "zone2", "zone3"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" diskType := DiskTypeSSD @@ -401,15 +440,18 @@ func TestDeleteDisk_DiffDiskMultiZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"zone1"} fakeManager := newFakeManager(gceProjectId, gceRegion) alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"zone1"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } diskName := "disk" diskType := DiskTypeSSD @@ -435,19 +477,22 @@ func TestGetAutoLabelsForPD_Basic(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "us-central1" + zone := "us-central1-c" + zonesWithNodes := []string{zone} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" diskType := DiskTypeSSD - zone := "us-central1-c" const sizeGb int64 = 128 alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{zone}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } gce.CreateDisk(diskName, diskType, zone, sizeGb, nil) @@ -472,19 +517,22 @@ func TestGetAutoLabelsForPD_NoZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "europe-west1" + zone := "europe-west1-d" + zonesWithNodes := []string{zone} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" diskType := DiskTypeStandard - zone := "europe-west1-d" const sizeGb int64 = 128 alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) if featureGateErr != nil { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{zone}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } gce.CreateDisk(diskName, diskType, zone, sizeGb, nil) @@ -508,10 +556,14 @@ func TestGetAutoLabelsForPD_DiskNotFound(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zone := "asia-northeast1-a" + zonesWithNodes := []string{zone} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" - zone := "asia-northeast1-a" - gce := GCECloud{manager: fakeManager, managedZones: []string{zone}} + gce := GCECloud{manager: fakeManager, + managedZones: zonesWithNodes, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }} /* Act */ _, err := gce.GetAutoLabelsForPD(diskName, zone) @@ -526,6 +578,7 @@ func TestGetAutoLabelsForPD_DiskNotFoundAndNoZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{}) @@ -533,9 +586,11 @@ func TestGetAutoLabelsForPD_DiskNotFoundAndNoZone(t *testing.T) { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } /* Act */ @@ -551,6 +606,7 @@ func TestGetAutoLabelsForPD_DupDisk(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "us-west1" + zonesWithNodes := []string{"us-west1-b", "asia-southeast1-a"} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" diskType := DiskTypeStandard @@ -562,9 +618,11 @@ func TestGetAutoLabelsForPD_DupDisk(t *testing.T) { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"us-west1-b", "asia-southeast1-a"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } for _, zone := range gce.managedZones { gce.CreateDisk(diskName, diskType, zone, sizeGb, nil) @@ -590,6 +648,7 @@ func TestGetAutoLabelsForPD_DupDiskNoZone(t *testing.T) { /* Arrange */ gceProjectId := "test-project" gceRegion := "fake-region" + zonesWithNodes := []string{"us-west1-b", "asia-southeast1-a"} fakeManager := newFakeManager(gceProjectId, gceRegion) diskName := "disk" diskType := DiskTypeStandard @@ -600,9 +659,11 @@ func TestGetAutoLabelsForPD_DupDiskNoZone(t *testing.T) { t.Error(featureGateErr) } gce := GCECloud{ - manager: fakeManager, - managedZones: []string{"us-west1-b", "asia-southeast1-a"}, - AlphaFeatureGate: alphaFeatureGate, + manager: fakeManager, + managedZones: zonesWithNodes, + AlphaFeatureGate: alphaFeatureGate, + nodeZones: createNodeZones(zonesWithNodes), + nodeInformerSynced: func() bool { return true }, } for _, zone := range gce.managedZones { gce.CreateDisk(diskName, diskType, zone, sizeGb, nil) @@ -925,3 +986,11 @@ func (manager *FakeServiceManager) WaitForRegionalOp( } return manager.waitForOpError } + +func createNodeZones(zones []string) map[string]sets.String { + nodeZones := map[string]sets.String{} + for _, zone := range zones { + nodeZones[zone] = sets.NewString("dummynode") + } + return nodeZones +} diff --git a/pkg/cloudprovider/providers/gce/gce_instances.go b/pkg/cloudprovider/providers/gce/gce_instances.go index 1d252d8d24..2a17d44299 100644 --- a/pkg/cloudprovider/providers/gce/gce_instances.go +++ b/pkg/cloudprovider/providers/gce/gce_instances.go @@ -273,35 +273,39 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error }) } -// GetAllZones returns all the zones in which nodes are running -func (gce *GCECloud) GetAllZones() (sets.String, error) { - // Fast-path for non-multizone - if len(gce.managedZones) == 1 { - return sets.NewString(gce.managedZones...), nil +// GetAllCurrentZones returns all the zones in which k8s nodes are currently running +func (gce *GCECloud) GetAllCurrentZones() (sets.String, error) { + if gce.nodeInformerSynced == nil { + glog.Warningf("GCECloud object does not have informers set, should only happen in E2E binary.") + return gce.GetAllZonesFromCloudProvider() } + gce.nodeZonesLock.Lock() + defer gce.nodeZonesLock.Unlock() + if !gce.nodeInformerSynced() { + return nil, fmt.Errorf("Node informer is not synced when trying to GetAllCurrentZones") + } + zones := sets.NewString() + for zone, nodes := range gce.nodeZones { + if len(nodes) > 0 { + zones.Insert(zone) + } + } + return zones, nil +} - // TODO: Caching, but this is currently only called when we are creating a volume, - // which is a relatively infrequent operation, and this is only # zones API calls +// GetAllZonesFromCloudProvider returns all the zones in which nodes are running +// Only use this in E2E tests to get zones, on real clusters this will +// get all zones with compute instances in them even if not k8s instances!!! +// ex. I have k8s nodes in us-central1-c and us-central1-b. I also have +// a non-k8s compute in us-central1-a. This func will return a,b, and c. +func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) { zones := sets.NewString() - // TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically) for _, zone := range gce.managedZones { mc := newInstancesMetricContext("list", zone) // We only retrieve one page in each zone - we only care about existence listCall := gce.service.Instances.List(gce.projectID, zone) - // No filter: We assume that a zone is either used or unused - // We could only consider running nodes (like we do in List above), - // but probably if instances are starting we still want to consider them. - // I think we should wait until we have a reason to make the - // call one way or the other; we generally can't guarantee correct - // volume spreading if the set of zones is changing - // (and volume spreading is currently only a heuristic). - // Long term we want to replace GetAllZones (which primarily supports volume - // spreading) with a scheduler policy that is able to see the global state of - // volumes and the health of zones. - - // Just a minimal set of fields - we only care about existence listCall = listCall.Fields("items(name)") res, err := listCall.Do() if err != nil { @@ -317,6 +321,16 @@ func (gce *GCECloud) GetAllZones() (sets.String, error) { return zones, nil } +// InsertInstance creates a new instance on GCP +func (gce *GCECloud) InsertInstance(project string, zone string, rb *compute.Instance) error { + mc := newInstancesMetricContext("create", zone) + op, err := gce.service.Instances.Insert(project, zone, rb).Do() + if err != nil { + return mc.Observe(err) + } + return gce.waitForZoneOp(op, zone, mc) +} + // ListInstanceNames returns a string of instance names seperated by spaces. func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) { res, err := gce.service.Instances.List(project, zone).Fields("items(name)").Do() diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index 026324e630..3d17a45501 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -153,7 +153,7 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin // 000 - neither "zone", "zones", or "replica-zones" specified // Pick a zone randomly selected from all active zones where // Kubernetes cluster has a node. - zones, err = cloud.GetAllZones() + zones, err = cloud.GetAllCurrentZones() if err != nil { glog.V(2).Infof("error getting zone information from GCE: %v", err) return "", 0, nil, "", err diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index cb24e143e6..d456bee415 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -97,8 +97,9 @@ func setupProviderConfig() error { cloudConfig.Provider = gceCloud + // Arbitrarily pick one of the zones we have nodes in if cloudConfig.Zone == "" && framework.TestContext.CloudConfig.MultiZone { - zones, err := gceCloud.GetAllZones() + zones, err := gceCloud.GetAllZonesFromCloudProvider() if err != nil { return err } diff --git a/test/e2e/multicluster/BUILD b/test/e2e/multicluster/BUILD index a5ce4fcb86..51cebcabd9 100644 --- a/test/e2e/multicluster/BUILD +++ b/test/e2e/multicluster/BUILD @@ -14,7 +14,9 @@ go_library( "//test/utils:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/google.golang.org/api/compute/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", diff --git a/test/e2e/multicluster/ubernetes_lite.go b/test/e2e/multicluster/ubernetes_lite.go index a2899c4cbd..7ceace73d8 100644 --- a/test/e2e/multicluster/ubernetes_lite.go +++ b/test/e2e/multicluster/ubernetes_lite.go @@ -19,10 +19,13 @@ package multicluster import ( "fmt" "math" + "strconv" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + compute "google.golang.org/api/compute/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" @@ -61,8 +64,128 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() { It("should schedule pods in the same zones as statically provisioned PVs [sig-storage]", func() { PodsUseStaticPVsOrFail(f, (2*zoneCount)+1, image) }) + + It("should only be allowed to provision PDs in zones where nodes exist", func() { + OnlyAllowNodeZones(f, zoneCount, image) + }) }) +// OnlyAllowNodeZones tests that GetAllCurrentZones returns only zones with Nodes +func OnlyAllowNodeZones(f *framework.Framework, zoneCount int, image string) { + gceCloud, err := framework.GetGCECloud() + Expect(err).NotTo(HaveOccurred()) + + // Get all the zones that the nodes are in + expectedZones, err := gceCloud.GetAllZonesFromCloudProvider() + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Expected zones: %v\n", expectedZones) + + // Get all the zones in this current region + region := gceCloud.Region() + allZonesInRegion, err := gceCloud.ListZonesInRegion(region) + Expect(err).NotTo(HaveOccurred()) + + var extraZone string + for _, zone := range allZonesInRegion { + if !expectedZones.Has(zone.Name) { + extraZone = zone.Name + break + } + } + Expect(extraZone).NotTo(Equal(""), fmt.Sprintf("No extra zones available in region %s", region)) + + By(fmt.Sprintf("starting a compute instance in unused zone: %v\n", extraZone)) + project := framework.TestContext.CloudConfig.ProjectID + zone := extraZone + myuuid := string(uuid.NewUUID()) + name := "compute-" + myuuid + imageURL := "https://www.googleapis.com/compute/v1/projects/debian-cloud/global/images/debian-7-wheezy-v20140606" + + rb := &compute.Instance{ + MachineType: "zones/" + zone + "/machineTypes/f1-micro", + Disks: []*compute.AttachedDisk{ + { + AutoDelete: true, + Boot: true, + Type: "PERSISTENT", + InitializeParams: &compute.AttachedDiskInitializeParams{ + DiskName: "my-root-pd-" + myuuid, + SourceImage: imageURL, + }, + }, + }, + NetworkInterfaces: []*compute.NetworkInterface{ + { + AccessConfigs: []*compute.AccessConfig{ + { + Type: "ONE_TO_ONE_NAT", + Name: "External NAT", + }, + }, + Network: "/global/networks/default", + }, + }, + Name: name, + } + + err = gceCloud.InsertInstance(project, zone, rb) + Expect(err).NotTo(HaveOccurred()) + + defer func() { + // Teardown of the compute instance + framework.Logf("Deleting compute resource: %v", name) + resp, err := gceCloud.DeleteInstance(project, zone, name) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Compute deletion response: %v\n", resp) + }() + + By("Creating zoneCount+1 PVCs and making sure PDs are only provisioned in zones with nodes") + // Create some (zoneCount+1) PVCs with names of form "pvc-x" where x is 1...zoneCount+1 + // This will exploit ChooseZoneForVolume in pkg/volume/util.go to provision them in all the zones it "sees" + var pvcList []*v1.PersistentVolumeClaim + c := f.ClientSet + ns := f.Namespace.Name + + for index := 1; index <= zoneCount+1; index++ { + pvc := newNamedDefaultClaim(ns, index) + pvc, err = framework.CreatePVC(c, ns, pvc) + Expect(err).NotTo(HaveOccurred()) + pvcList = append(pvcList, pvc) + + // Defer the cleanup + defer func() { + framework.Logf("deleting claim %q/%q", pvc.Namespace, pvc.Name) + err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil) + if err != nil { + framework.Failf("Error deleting claim %q. Error: %v", pvc.Name, err) + } + }() + } + + // Wait for all claims bound + for _, claim := range pvcList { + err = framework.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, c, claim.Namespace, claim.Name, framework.Poll, framework.ClaimProvisionTimeout) + Expect(err).NotTo(HaveOccurred()) + } + + pvZones := sets.NewString() + By("Checking that PDs have been provisioned in only the expected zones") + for _, claim := range pvcList { + // Get a new copy of the claim to have all fields populated + claim, err = c.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(claim.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // Get the related PV + pv, err := c.CoreV1().PersistentVolumes().Get(claim.Spec.VolumeName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + pvZone, ok := pv.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + Expect(ok).To(BeTrue(), "PV has no LabelZone to be found") + pvZones.Insert(pvZone) + } + Expect(pvZones.Equal(expectedZones)).To(BeTrue(), fmt.Sprintf("PDs provisioned in unwanted zones. We want zones: %v, got: %v", expectedZones, pvZones)) +} + // Check that the pods comprising a service get spread evenly across available zones func SpreadServiceOrFail(f *framework.Framework, replicaCount int, image string) { // First create the service @@ -320,3 +443,24 @@ func PodsUseStaticPVsOrFail(f *framework.Framework, podCount int, image string) Expect(err).NotTo(HaveOccurred()) } } + +func newNamedDefaultClaim(ns string, index int) *v1.PersistentVolumeClaim { + claim := v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-" + strconv.Itoa(index), + Namespace: ns, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } + + return &claim +} diff --git a/test/e2e/storage/volume_provisioning.go b/test/e2e/storage/volume_provisioning.go index 306df4933d..38d1b25793 100644 --- a/test/e2e/storage/volume_provisioning.go +++ b/test/e2e/storage/volume_provisioning.go @@ -495,8 +495,8 @@ var _ = SIGDescribe("Dynamic Provisioning", func() { gceCloud, err := framework.GetGCECloud() Expect(err).NotTo(HaveOccurred()) - // Get all k8s managed zones - managedZones, err = gceCloud.GetAllZones() + // Get all k8s managed zones (same as zones with nodes in them for test) + managedZones, err = gceCloud.GetAllZonesFromCloudProvider() Expect(err).NotTo(HaveOccurred()) // Get a list of all zones in the project