Changed GetAllZones to only get zones with nodes that are currently

running (renamed to GetAllCurrentZones). Added E2E test to confirm this
behavior.

Added node informer to cloud-provider controller to keep track of zones
with k8s nodes in them.
pull/6/head
David Zhu 2017-09-11 20:20:29 -07:00
parent 849d7f8595
commit e5aec8645d
13 changed files with 421 additions and 112 deletions

View File

@ -442,6 +442,10 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
}
}
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(sharedInformers)
}
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,

View File

@ -18,6 +18,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
],
)

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/controller"
)
@ -49,6 +50,11 @@ type Interface interface {
HasClusterID() bool
}
type InformerUser interface {
// SetInformers sets the informer on the cloud object.
SetInformers(informerFactory informers.SharedInformerFactory)
}
// Clusters is an abstract, pluggable interface for clusters of containers.
type Clusters interface {
// ListClusters lists the names of the available clusters.

View File

@ -75,6 +75,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",

View File

@ -34,13 +34,16 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/version"
"github.com/golang/glog"
@ -99,18 +102,21 @@ type GCECloud struct {
// for the cloudprovider to start watching the configmap.
ClusterID ClusterID
service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
// managedZones will be set to the 1 zone if running a single zone cluster
// it will be set to ALL zones in region for any multi-zone cluster
// Use GetAllCurrentZones to get only zones that contain nodes
managedZones []string
networkURL string
isLegacyNetwork bool
subnetworkURL string
@ -125,6 +131,12 @@ type GCECloud struct {
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
manager diskServiceManager
// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
nodeInformerSynced cache.InformerSynced
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
@ -470,6 +482,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
nodeZones: map[string]sets.String{},
}
gce.manager = &gceServiceManager{gce}
@ -582,6 +595,68 @@ func (gce *GCECloud) IsLegacyNetwork() bool {
return gce.isLegacyNetwork
}
func (gce *GCECloud) SetInformers(informerFactory informers.SharedInformerFactory) {
glog.Infof("Setting up informers for GCECloud")
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
gce.updateNodeZones(nil, node)
},
UpdateFunc: func(prev, obj interface{}) {
prevNode := prev.(*v1.Node)
newNode := obj.(*v1.Node)
if newNode.Labels[kubeletapis.LabelZoneFailureDomain] ==
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
return
}
gce.updateNodeZones(prevNode, newNode)
},
DeleteFunc: func(obj interface{}) {
node, isNode := obj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here
// and we need to handle that correctly.
if !isNode {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", obj)
return
}
node, ok = deletedState.Obj.(*v1.Node)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
gce.updateNodeZones(node, nil)
},
})
gce.nodeInformerSynced = nodeInformer.HasSynced
}
func (gce *GCECloud) updateNodeZones(prevNode, newNode *v1.Node) {
gce.nodeZonesLock.Lock()
defer gce.nodeZonesLock.Unlock()
if prevNode != nil {
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
gce.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
if gce.nodeZones[prevZone].Len() == 0 {
gce.nodeZones[prevZone] = nil
}
}
}
if newNode != nil {
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
if gce.nodeZones[newZone] == nil {
gce.nodeZones[newZone] = sets.NewString()
}
gce.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
}
}
}
// Known-useless DNS search path.
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)

View File

@ -690,11 +690,14 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam
// JSON in Description field.
func (gce *GCECloud) CreateDisk(
name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
return fmt.Errorf("kubernetes does not manage zone %q", zone)
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable.
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}
if !curZones.Has(zone) {
return fmt.Errorf("kubernetes does not have a node in zone %q", zone)
}
tagsStr, err := gce.encodeDiskTags(tags)
@ -733,17 +736,16 @@ func (gce *GCECloud) CreateDisk(
func (gce *GCECloud) CreateRegionalDisk(
name string, diskType string, replicaZones sets.String, sizeGb int64, tags map[string]string) error {
// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
unmanagedZones := []string{}
for _, zone := range replicaZones.UnsortedList() {
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
unmanagedZones = append(unmanagedZones, zone)
}
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable. This functionality should be reverted to checking
// against managed zones if we want users to be able to create RegionalDisks
// in zones where there are no nodes
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}
if len(unmanagedZones) > 0 {
return fmt.Errorf("kubernetes does not manage specified zones: %q. Managed Zones: %q", unmanagedZones, gce.managedZones)
if !curZones.IsSuperset(replicaZones) {
return fmt.Errorf("kubernetes does not have nodes in specified zones: %q. Zones that contain nodes: %q", replicaZones.Difference(curZones), curZones)
}
tagsStr, err := gce.encodeDiskTags(tags)
@ -776,16 +778,6 @@ func (gce *GCECloud) CreateRegionalDisk(
return err
}
func (gce *GCECloud) verifyZoneIsManaged(zone string) bool {
for _, managedZone := range gce.managedZones {
if zone == managedZone {
return true
}
}
return false
}
func getDiskType(diskType string) (string, error) {
switch diskType {
case DiskTypeSSD, DiskTypeStandard:

View File

@ -37,16 +37,19 @@ func TestCreateDisk_Basic(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
projectID: gceProjectId,
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: []string{"zone1"},
projectID: gceProjectId,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
@ -95,16 +98,20 @@ func TestCreateRegionalDisk_Basic(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1", "zone3", "zone2"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{AlphaFeatureGCEDisk})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1", "zone3", "zone2"},
projectID: gceProjectId,
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
projectID: gceProjectId,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
@ -153,15 +160,18 @@ func TestCreateDisk_DiskAlreadyExists(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
// Inject disk AlreadyExists error.
@ -184,8 +194,13 @@ func TestCreateDisk_WrongZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
gce := GCECloud{manager: fakeManager, managedZones: []string{"zone1"}}
gce := GCECloud{
manager: fakeManager,
managedZones: zonesWithNodes,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true }}
diskName := "disk"
diskType := DiskTypeSSD
@ -204,8 +219,13 @@ func TestCreateDisk_NoManagedZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{}
fakeManager := newFakeManager(gceProjectId, gceRegion)
gce := GCECloud{manager: fakeManager, managedZones: []string{}}
gce := GCECloud{
manager: fakeManager,
managedZones: zonesWithNodes,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true }}
diskName := "disk"
diskType := DiskTypeSSD
@ -224,8 +244,12 @@ func TestCreateDisk_BadDiskType(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
gce := GCECloud{manager: fakeManager, managedZones: []string{"zone1"}}
gce := GCECloud{manager: fakeManager,
managedZones: zonesWithNodes,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true }}
diskName := "disk"
diskType := "arbitrary-disk"
@ -245,15 +269,18 @@ func TestCreateDisk_MultiZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1", "zone2", "zone3"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1", "zone2", "zone3"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
@ -274,15 +301,18 @@ func TestDeleteDisk_Basic(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
diskType := DiskTypeSSD
@ -311,15 +341,18 @@ func TestDeleteDisk_NotFound(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
@ -336,15 +369,18 @@ func TestDeleteDisk_ResourceBeingUsed(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
diskType := DiskTypeSSD
@ -367,15 +403,18 @@ func TestDeleteDisk_SameDiskMultiZone(t *testing.T) {
/* Assert */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1", "zone2", "zone3"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1", "zone2", "zone3"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
diskType := DiskTypeSSD
@ -401,15 +440,18 @@ func TestDeleteDisk_DiffDiskMultiZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"zone1"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"zone1"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
diskName := "disk"
diskType := DiskTypeSSD
@ -435,19 +477,22 @@ func TestGetAutoLabelsForPD_Basic(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "us-central1"
zone := "us-central1-c"
zonesWithNodes := []string{zone}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
diskType := DiskTypeSSD
zone := "us-central1-c"
const sizeGb int64 = 128
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{zone},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
gce.CreateDisk(diskName, diskType, zone, sizeGb, nil)
@ -472,19 +517,22 @@ func TestGetAutoLabelsForPD_NoZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "europe-west1"
zone := "europe-west1-d"
zonesWithNodes := []string{zone}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
diskType := DiskTypeStandard
zone := "europe-west1-d"
const sizeGb int64 = 128
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
if featureGateErr != nil {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{zone},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
gce.CreateDisk(diskName, diskType, zone, sizeGb, nil)
@ -508,10 +556,14 @@ func TestGetAutoLabelsForPD_DiskNotFound(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zone := "asia-northeast1-a"
zonesWithNodes := []string{zone}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
zone := "asia-northeast1-a"
gce := GCECloud{manager: fakeManager, managedZones: []string{zone}}
gce := GCECloud{manager: fakeManager,
managedZones: zonesWithNodes,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true }}
/* Act */
_, err := gce.GetAutoLabelsForPD(diskName, zone)
@ -526,6 +578,7 @@ func TestGetAutoLabelsForPD_DiskNotFoundAndNoZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
alphaFeatureGate, featureGateErr := NewAlphaFeatureGate([]string{})
@ -533,9 +586,11 @@ func TestGetAutoLabelsForPD_DiskNotFoundAndNoZone(t *testing.T) {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
/* Act */
@ -551,6 +606,7 @@ func TestGetAutoLabelsForPD_DupDisk(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "us-west1"
zonesWithNodes := []string{"us-west1-b", "asia-southeast1-a"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
diskType := DiskTypeStandard
@ -562,9 +618,11 @@ func TestGetAutoLabelsForPD_DupDisk(t *testing.T) {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"us-west1-b", "asia-southeast1-a"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
for _, zone := range gce.managedZones {
gce.CreateDisk(diskName, diskType, zone, sizeGb, nil)
@ -590,6 +648,7 @@ func TestGetAutoLabelsForPD_DupDiskNoZone(t *testing.T) {
/* Arrange */
gceProjectId := "test-project"
gceRegion := "fake-region"
zonesWithNodes := []string{"us-west1-b", "asia-southeast1-a"}
fakeManager := newFakeManager(gceProjectId, gceRegion)
diskName := "disk"
diskType := DiskTypeStandard
@ -600,9 +659,11 @@ func TestGetAutoLabelsForPD_DupDiskNoZone(t *testing.T) {
t.Error(featureGateErr)
}
gce := GCECloud{
manager: fakeManager,
managedZones: []string{"us-west1-b", "asia-southeast1-a"},
AlphaFeatureGate: alphaFeatureGate,
manager: fakeManager,
managedZones: zonesWithNodes,
AlphaFeatureGate: alphaFeatureGate,
nodeZones: createNodeZones(zonesWithNodes),
nodeInformerSynced: func() bool { return true },
}
for _, zone := range gce.managedZones {
gce.CreateDisk(diskName, diskType, zone, sizeGb, nil)
@ -925,3 +986,11 @@ func (manager *FakeServiceManager) WaitForRegionalOp(
}
return manager.waitForOpError
}
func createNodeZones(zones []string) map[string]sets.String {
nodeZones := map[string]sets.String{}
for _, zone := range zones {
nodeZones[zone] = sets.NewString("dummynode")
}
return nodeZones
}

View File

@ -273,35 +273,39 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
})
}
// GetAllZones returns all the zones in which nodes are running
func (gce *GCECloud) GetAllZones() (sets.String, error) {
// Fast-path for non-multizone
if len(gce.managedZones) == 1 {
return sets.NewString(gce.managedZones...), nil
// GetAllCurrentZones returns all the zones in which k8s nodes are currently running
func (gce *GCECloud) GetAllCurrentZones() (sets.String, error) {
if gce.nodeInformerSynced == nil {
glog.Warningf("GCECloud object does not have informers set, should only happen in E2E binary.")
return gce.GetAllZonesFromCloudProvider()
}
gce.nodeZonesLock.Lock()
defer gce.nodeZonesLock.Unlock()
if !gce.nodeInformerSynced() {
return nil, fmt.Errorf("Node informer is not synced when trying to GetAllCurrentZones")
}
zones := sets.NewString()
for zone, nodes := range gce.nodeZones {
if len(nodes) > 0 {
zones.Insert(zone)
}
}
return zones, nil
}
// TODO: Caching, but this is currently only called when we are creating a volume,
// which is a relatively infrequent operation, and this is only # zones API calls
// GetAllZonesFromCloudProvider returns all the zones in which nodes are running
// Only use this in E2E tests to get zones, on real clusters this will
// get all zones with compute instances in them even if not k8s instances!!!
// ex. I have k8s nodes in us-central1-c and us-central1-b. I also have
// a non-k8s compute in us-central1-a. This func will return a,b, and c.
func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) {
zones := sets.NewString()
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
mc := newInstancesMetricContext("list", zone)
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)
// No filter: We assume that a zone is either used or unused
// We could only consider running nodes (like we do in List above),
// but probably if instances are starting we still want to consider them.
// I think we should wait until we have a reason to make the
// call one way or the other; we generally can't guarantee correct
// volume spreading if the set of zones is changing
// (and volume spreading is currently only a heuristic).
// Long term we want to replace GetAllZones (which primarily supports volume
// spreading) with a scheduler policy that is able to see the global state of
// volumes and the health of zones.
// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")
res, err := listCall.Do()
if err != nil {
@ -317,6 +321,16 @@ func (gce *GCECloud) GetAllZones() (sets.String, error) {
return zones, nil
}
// InsertInstance creates a new instance on GCP
func (gce *GCECloud) InsertInstance(project string, zone string, rb *compute.Instance) error {
mc := newInstancesMetricContext("create", zone)
op, err := gce.service.Instances.Insert(project, zone, rb).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForZoneOp(op, zone, mc)
}
// ListInstanceNames returns a string of instance names seperated by spaces.
func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) {
res, err := gce.service.Instances.List(project, zone).Fields("items(name)").Do()

View File

@ -153,7 +153,7 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin
// 000 - neither "zone", "zones", or "replica-zones" specified
// Pick a zone randomly selected from all active zones where
// Kubernetes cluster has a node.
zones, err = cloud.GetAllZones()
zones, err = cloud.GetAllCurrentZones()
if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, nil, "", err

View File

@ -97,8 +97,9 @@ func setupProviderConfig() error {
cloudConfig.Provider = gceCloud
// Arbitrarily pick one of the zones we have nodes in
if cloudConfig.Zone == "" && framework.TestContext.CloudConfig.MultiZone {
zones, err := gceCloud.GetAllZones()
zones, err := gceCloud.GetAllZonesFromCloudProvider()
if err != nil {
return err
}

View File

@ -14,7 +14,9 @@ go_library(
"//test/utils:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",

View File

@ -19,10 +19,13 @@ package multicluster
import (
"fmt"
"math"
"strconv"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
compute "google.golang.org/api/compute/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
@ -61,8 +64,128 @@ var _ = SIGDescribe("Multi-AZ Clusters", func() {
It("should schedule pods in the same zones as statically provisioned PVs [sig-storage]", func() {
PodsUseStaticPVsOrFail(f, (2*zoneCount)+1, image)
})
It("should only be allowed to provision PDs in zones where nodes exist", func() {
OnlyAllowNodeZones(f, zoneCount, image)
})
})
// OnlyAllowNodeZones tests that GetAllCurrentZones returns only zones with Nodes
func OnlyAllowNodeZones(f *framework.Framework, zoneCount int, image string) {
gceCloud, err := framework.GetGCECloud()
Expect(err).NotTo(HaveOccurred())
// Get all the zones that the nodes are in
expectedZones, err := gceCloud.GetAllZonesFromCloudProvider()
Expect(err).NotTo(HaveOccurred())
framework.Logf("Expected zones: %v\n", expectedZones)
// Get all the zones in this current region
region := gceCloud.Region()
allZonesInRegion, err := gceCloud.ListZonesInRegion(region)
Expect(err).NotTo(HaveOccurred())
var extraZone string
for _, zone := range allZonesInRegion {
if !expectedZones.Has(zone.Name) {
extraZone = zone.Name
break
}
}
Expect(extraZone).NotTo(Equal(""), fmt.Sprintf("No extra zones available in region %s", region))
By(fmt.Sprintf("starting a compute instance in unused zone: %v\n", extraZone))
project := framework.TestContext.CloudConfig.ProjectID
zone := extraZone
myuuid := string(uuid.NewUUID())
name := "compute-" + myuuid
imageURL := "https://www.googleapis.com/compute/v1/projects/debian-cloud/global/images/debian-7-wheezy-v20140606"
rb := &compute.Instance{
MachineType: "zones/" + zone + "/machineTypes/f1-micro",
Disks: []*compute.AttachedDisk{
{
AutoDelete: true,
Boot: true,
Type: "PERSISTENT",
InitializeParams: &compute.AttachedDiskInitializeParams{
DiskName: "my-root-pd-" + myuuid,
SourceImage: imageURL,
},
},
},
NetworkInterfaces: []*compute.NetworkInterface{
{
AccessConfigs: []*compute.AccessConfig{
{
Type: "ONE_TO_ONE_NAT",
Name: "External NAT",
},
},
Network: "/global/networks/default",
},
},
Name: name,
}
err = gceCloud.InsertInstance(project, zone, rb)
Expect(err).NotTo(HaveOccurred())
defer func() {
// Teardown of the compute instance
framework.Logf("Deleting compute resource: %v", name)
resp, err := gceCloud.DeleteInstance(project, zone, name)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Compute deletion response: %v\n", resp)
}()
By("Creating zoneCount+1 PVCs and making sure PDs are only provisioned in zones with nodes")
// Create some (zoneCount+1) PVCs with names of form "pvc-x" where x is 1...zoneCount+1
// This will exploit ChooseZoneForVolume in pkg/volume/util.go to provision them in all the zones it "sees"
var pvcList []*v1.PersistentVolumeClaim
c := f.ClientSet
ns := f.Namespace.Name
for index := 1; index <= zoneCount+1; index++ {
pvc := newNamedDefaultClaim(ns, index)
pvc, err = framework.CreatePVC(c, ns, pvc)
Expect(err).NotTo(HaveOccurred())
pvcList = append(pvcList, pvc)
// Defer the cleanup
defer func() {
framework.Logf("deleting claim %q/%q", pvc.Namespace, pvc.Name)
err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil)
if err != nil {
framework.Failf("Error deleting claim %q. Error: %v", pvc.Name, err)
}
}()
}
// Wait for all claims bound
for _, claim := range pvcList {
err = framework.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, c, claim.Namespace, claim.Name, framework.Poll, framework.ClaimProvisionTimeout)
Expect(err).NotTo(HaveOccurred())
}
pvZones := sets.NewString()
By("Checking that PDs have been provisioned in only the expected zones")
for _, claim := range pvcList {
// Get a new copy of the claim to have all fields populated
claim, err = c.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(claim.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
// Get the related PV
pv, err := c.CoreV1().PersistentVolumes().Get(claim.Spec.VolumeName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
pvZone, ok := pv.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
Expect(ok).To(BeTrue(), "PV has no LabelZone to be found")
pvZones.Insert(pvZone)
}
Expect(pvZones.Equal(expectedZones)).To(BeTrue(), fmt.Sprintf("PDs provisioned in unwanted zones. We want zones: %v, got: %v", expectedZones, pvZones))
}
// Check that the pods comprising a service get spread evenly across available zones
func SpreadServiceOrFail(f *framework.Framework, replicaCount int, image string) {
// First create the service
@ -320,3 +443,24 @@ func PodsUseStaticPVsOrFail(f *framework.Framework, podCount int, image string)
Expect(err).NotTo(HaveOccurred())
}
}
func newNamedDefaultClaim(ns string, index int) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-" + strconv.Itoa(index),
Namespace: ns,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
},
},
}
return &claim
}

View File

@ -495,8 +495,8 @@ var _ = SIGDescribe("Dynamic Provisioning", func() {
gceCloud, err := framework.GetGCECloud()
Expect(err).NotTo(HaveOccurred())
// Get all k8s managed zones
managedZones, err = gceCloud.GetAllZones()
// Get all k8s managed zones (same as zones with nodes in them for test)
managedZones, err = gceCloud.GetAllZonesFromCloudProvider()
Expect(err).NotTo(HaveOccurred())
// Get a list of all zones in the project