[e2e ingress-gce] Fix race condition for appending services and ingresses

pull/8/head
Zihong Zheng 2018-04-18 20:26:16 -07:00
parent 44ede98e94
commit 9a5ce379fe
1 changed files with 25 additions and 15 deletions

View File

@ -179,25 +179,27 @@ func (f *IngressScaleFramework) RunScaleTest() []error {
}
}
// currentNum keeps track of how many ingresses have been created.
currentNum := new(int)
// numIngsCreated keeps track of how many ingresses have been created.
numIngsCreated := 0
prepareIngsFunc := func(goalNum int) {
prepareIngsFunc := func(numIngsNeeded int) {
var ingWg sync.WaitGroup
numToCreate := goalNum - *currentNum
ingWg.Add(numToCreate)
errQueue := make(chan error, numToCreate)
latencyQueue := make(chan time.Duration, numToCreate)
numIngsToCreate := numIngsNeeded - numIngsCreated
ingWg.Add(numIngsToCreate)
svcQueue := make(chan *v1.Service, numIngsToCreate)
ingQueue := make(chan *extensions.Ingress, numIngsToCreate)
errQueue := make(chan error, numIngsToCreate)
latencyQueue := make(chan time.Duration, numIngsToCreate)
start := time.Now()
for ; *currentNum < goalNum; *currentNum++ {
suffix := fmt.Sprintf("%d", *currentNum)
for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
suffix := fmt.Sprintf("%d", numIngsCreated)
go func() {
defer ingWg.Done()
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(suffix, f.EnableTLS)
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
svcQueue <- svcCreated
ingQueue <- ingCreated
if err != nil {
errQueue <- err
return
@ -214,11 +216,19 @@ func (f *IngressScaleFramework) RunScaleTest() []error {
}
// Wait until all ingress creations are complete.
f.Logger.Infof("Waiting for %d ingresses to come up...", numToCreate)
f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
ingWg.Wait()
close(svcQueue)
close(ingQueue)
close(errQueue)
close(latencyQueue)
elapsed := time.Since(start)
for svc := range svcQueue {
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
}
for ing := range ingQueue {
f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
}
var createLatencies []time.Duration
for latency := range latencyQueue {
createLatencies = append(createLatencies, latency)
@ -231,15 +241,15 @@ func (f *IngressScaleFramework) RunScaleTest() []error {
}
return
}
f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numToCreate)
f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
f.BatchDurations = append(f.BatchDurations, elapsed)
}
measureCreateUpdateFunc := func() {
f.Logger.Infof("Create one more ingress and wait for it to come up")
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", *currentNum), f.EnableTLS)
*currentNum = *currentNum + 1
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
numIngsCreated = numIngsCreated + 1
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
if err != nil {