From 358e90969a1d00cd52148fadd85d38c0902c3708 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 17 Jun 2015 11:08:27 -0700 Subject: [PATCH] Fix service latency test. * More accurate latency measurement. * More lenient/sensible pass requirements. --- test/e2e/service_latency.go | 292 +++++++++++++++++++++++++++--------- 1 file changed, 217 insertions(+), 75 deletions(-) diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index 06d4ded595..601f1d00f8 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -23,9 +23,13 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" . "github.com/onsi/ginkgo" ) @@ -40,22 +44,32 @@ var _ = Describe("Service endpoints latency", func() { f := NewFramework("svc-latency") It("should not be very high", func() { - nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) - if err != nil { - Failf("Failed to list nodes: %v", err) - } - count := len(nodes.Items) + const ( + // These are very generous criteria. Ideally we will + // get this much lower in the future. See issue + // #10436. + limitMedian = time.Second * 20 + limitTail = time.Second * 40 - // Numbers chosen to make the test complete in a short amount - // of time. This sample size is not actually large enough to - // reliably measure tails on a reasonably sized test cluster, - // but it should catch low hanging fruit. - var ( - totalTrials = 20 * count - parallelTrials = 8 * count - minSampleSize = 10 * count + // Numbers chosen to make the test complete in a short amount + // of time. This sample size is not actually large enough to + // reliably measure tails (it may give false positives, but not + // false negatives), but it should catch low hanging fruit. + // + // Note that these are fixed and do not depend on the + // size of the cluster. Setting parallelTrials larger + // distorts the measurements. Perhaps this wouldn't be + // true on HA clusters. + totalTrials = 200 + parallelTrials = 15 + minSampleSize = 100 ) + // Turn off rate limiting--it interferes with our measurements. + oldThrottle := f.Client.RESTClient.Throttle + f.Client.RESTClient.Throttle = util.NewFakeRateLimiter() + defer func() { f.Client.RESTClient.Throttle = oldThrottle }() + failing := util.NewStringSet() d, err := runServiceLatencies(f, parallelTrials, totalTrials) if err != nil { @@ -85,45 +99,64 @@ var _ = Describe("Service endpoints latency", func() { Logf("50 %%ile: %v", p50) Logf("90 %%ile: %v", p90) Logf("99 %%ile: %v", p99) + Logf("Total sample count: %v", len(dSorted)) - if p99 > 4*p50 { - failing.Insert("Tail latency is > 4x median latency") + if p50 > limitMedian { + failing.Insert("Median latency should be less than " + limitMedian.String()) } - - if p50 > time.Second*20 { - failing.Insert("Median latency should be less than 20 seconds") + if p99 > limitTail { + failing.Insert("Tail (99 percentile) latency should be less than " + limitTail.String()) } if failing.Len() > 0 { - Fail(strings.Join(failing.List(), "\n")) + errList := strings.Join(failing.List(), "\n") + helpfulInfo := fmt.Sprintf("\n50, 90, 99 percentiles: %v %v %v", p50, p90, p99) + Fail(errList + helpfulInfo) } }) }) func runServiceLatencies(f *Framework, inParallel, total int) (output []time.Duration, err error) { - next := make(chan int, total) - go func() { - for i := 0; i < total; i++ { - next <- i - } - close(next) - }() + cfg := RCConfig{ + Client: f.Client, + Image: "gcr.io/google_containers/pause:1.0", + Name: "svc-latency-rc", + Namespace: f.Namespace.Name, + Replicas: 1, + PollInterval: time.Second, + } + if err := RunRC(cfg); err != nil { + return nil, err + } + defer DeleteRC(f.Client, f.Namespace.Name, cfg.Name) + // Run a single watcher, to reduce the number of API calls we have to + // make; this is to minimize the timing error. It's how kube-proxy + // consumes the endpoints data, so it seems like the right thing to + // test. + endpointQueries := newQuerier() + startEndpointWatcher(f, endpointQueries) + defer close(endpointQueries.stop) + + // run one test and throw it away-- this is to make sure that the pod's + // ready status has propagated. + singleServiceLatency(f, cfg.Name, endpointQueries) + + // These channels are never closed, and each attempt sends on exactly + // one of these channels, so the sum of the things sent over them will + // be exactly total. errs := make(chan error, total) durations := make(chan time.Duration, total) - for i := 0; i < inParallel; i++ { + blocker := make(chan struct{}, inParallel) + for i := 0; i < total; i++ { go func() { defer GinkgoRecover() - for { - i, ok := <-next - if !ok { - return - } - if d, err := singleServiceLatency(f, i); err != nil { - errs <- err - } else { - durations <- d - } + blocker <- struct{}{} + defer func() { <-blocker }() + if d, err := singleServiceLatency(f, cfg.Name, endpointQueries); err != nil { + errs <- err + } else { + durations <- d } }() } @@ -144,58 +177,167 @@ func runServiceLatencies(f *Framework, inParallel, total int) (output []time.Dur return output, nil } -func singleServiceLatency(f *Framework, i int) (time.Duration, error) { - // Make an RC with a single pod. - cfg := RCConfig{ - Client: f.Client, - Image: "gcr.io/google_containers/pause:1.0", - Name: fmt.Sprintf("trial-%v", i), - Namespace: f.Namespace.Name, - Replicas: 1, - PollInterval: time.Second, - } - if err := RunRC(cfg); err != nil { - return 0, err - } - defer DeleteRC(f.Client, f.Namespace.Name, cfg.Name) +type endpointQuery struct { + endpointsName string + endpoints *api.Endpoints + result chan<- struct{} +} - // Now make a service that points to that pod. +type endpointQueries struct { + requests map[string]*endpointQuery + + stop chan struct{} + requestChan chan *endpointQuery + seenChan chan *api.Endpoints +} + +func newQuerier() *endpointQueries { + eq := &endpointQueries{ + requests: map[string]*endpointQuery{}, + + stop: make(chan struct{}, 100), + requestChan: make(chan *endpointQuery), + seenChan: make(chan *api.Endpoints, 100), + } + go eq.join() + return eq +} + +// join merges the incoming streams of requests and added endpoints. It has +// nice properties like: +// * remembering an endpoint if it happens to arrive before it is requested. +// * closing all outstanding requests (returning nil) if it is stopped. +func (eq *endpointQueries) join() { + defer func() { + // Terminate all pending requests, so that no goroutine will + // block indefinitely. + for _, req := range eq.requests { + if req.result != nil { + close(req.result) + } + } + }() + + for { + select { + case <-eq.stop: + return + case req := <-eq.requestChan: + if cur, ok := eq.requests[req.endpointsName]; ok && cur.endpoints != nil { + // We've already gotten the result, so we can + // immediately satisfy this request. + delete(eq.requests, req.endpointsName) + req.endpoints = cur.endpoints + close(req.result) + } else { + // Save this request. + eq.requests[req.endpointsName] = req + } + case got := <-eq.seenChan: + if req, ok := eq.requests[got.Name]; ok { + if req.result != nil { + // Satisfy a request. + delete(eq.requests, got.Name) + req.endpoints = got + close(req.result) + } else { + // We've already recorded a result, but + // haven't gotten the request yet. Only + // keep the first result. + } + } else { + // We haven't gotten the corresponding request + // yet, save this result. + eq.requests[got.Name] = &endpointQuery{ + endpoints: got, + } + } + } + } +} + +// request blocks until the requested endpoint is seen. +func (eq *endpointQueries) request(endpointsName string) *api.Endpoints { + result := make(chan struct{}) + req := &endpointQuery{ + endpointsName: endpointsName, + result: result, + } + eq.requestChan <- req + <-result + return req.endpoints +} + +// marks e as added; does not block. +func (eq *endpointQueries) added(e *api.Endpoints) { + eq.seenChan <- e +} + +// blocks until it has finished syncing. +func startEndpointWatcher(f *Framework, q *endpointQueries) { + _, controller := framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return f.Client.Endpoints(f.Namespace.Name).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Endpoints{}, + 0, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if e, ok := obj.(*api.Endpoints); ok { + if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { + q.added(e) + } + } + }, + UpdateFunc: func(old, cur interface{}) { + if e, ok := cur.(*api.Endpoints); ok { + if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { + q.added(e) + } + } + }, + }, + ) + + go controller.Run(q.stop) + + // Wait for the controller to sync, so that we don't count any warm-up time. + for !controller.HasSynced() { + time.Sleep(100 * time.Millisecond) + } +} + +func singleServiceLatency(f *Framework, name string, q *endpointQueries) (time.Duration, error) { + // Make a service that points to that pod. svc := &api.Service{ ObjectMeta: api.ObjectMeta{ - Name: cfg.Name, + GenerateName: "latency-svc-", }, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{Protocol: api.ProtocolTCP, Port: 80}}, - Selector: map[string]string{"name": cfg.Name}, + Selector: map[string]string{"name": name}, Type: api.ServiceTypeClusterIP, SessionAffinity: api.ServiceAffinityNone, }, } + startTime := time.Now() gotSvc, err := f.Client.Services(f.Namespace.Name).Create(svc) if err != nil { return 0, err } + Logf("Created: %v", gotSvc.Name) + defer f.Client.Services(gotSvc.Namespace).Delete(gotSvc.Name) - // Now time how long it takes for the endpoints to show up. - startTime := time.Now() - defer f.Client.Services(f.Namespace.Name).Delete(gotSvc.Name) - w, err := f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Set{"metadata.name": cfg.Name}.AsSelector(), gotSvc.ResourceVersion) - if err != nil { - return 0, err + if e := q.request(gotSvc.Name); e == nil { + return 0, fmt.Errorf("Never got a result for endpoint %v", gotSvc.Name) } - defer w.Stop() - - for { - val, ok := <-w.ResultChan() - if !ok { - return 0, fmt.Errorf("watch closed") - } - if e, ok := val.Object.(*api.Endpoints); ok { - if e.Name == cfg.Name && len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - stopTime := time.Now() - return stopTime.Sub(startTime), nil - } - } - } - + stopTime := time.Now() + d := stopTime.Sub(startTime) + Logf("Got endpoints: %v [%v]", gotSvc.Name, d) + return d, nil }