From 4d5ac91f882c7a1abf787b4b05a7f762c0e60884 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 17 Oct 2016 08:58:40 +0200 Subject: [PATCH] Add tracing to listing in Cacher --- pkg/storage/cacher.go | 8 +++++++- pkg/storage/watch_cache.go | 6 +++++- pkg/storage/watch_cache_test.go | 8 ++++---- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 2cd9d4483e..473e8d9db8 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -367,7 +367,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p return err } + trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String())) + defer trace.LogIfLong(250 * time.Millisecond) + c.ready.wait() + trace.Step("Ready") // List elements from cache, with at least 'listRV'. listPtr, err := meta.GetItemsPtr(listObj) @@ -380,10 +384,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } filter := filterFunction(key, c.keyFunc, pred) - objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV) + objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) if err != nil { return fmt.Errorf("failed to wait for fresh list: %v", err) } + trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs))) for _, obj := range objs { object, ok := obj.(runtime.Object) if !ok { @@ -393,6 +398,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) } } + trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) if c.versioner != nil { if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { return err diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index b3f5e0371c..182b61da17 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/clock" "k8s.io/kubernetes/pkg/watch" ) @@ -206,7 +207,7 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) { +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) { startTime := w.clock.Now() go func() { // Wake us up when the time limit has expired. The docs @@ -228,6 +229,9 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{ } w.cond.Wait() } + if trace != nil { + trace.Step("Cache is fresh enough") + } return w.store.List(), w.resourceVersion, nil } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index b5b3c893e1..7843a9d398 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -248,7 +248,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - list, resourceVersion, err := store.WaitUntilFreshAndList(5) + list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -278,7 +278,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - _, _, err := store.WaitUntilFreshAndList(5) + _, _, err := store.WaitUntilFreshAndList(5, nil) if err == nil { t.Fatalf("unexpected lack of timeout error") } @@ -300,7 +300,7 @@ func TestReflectorForWatchCache(t *testing.T) { store := newTestWatchCache(5) { - _, version, err := store.WaitUntilFreshAndList(0) + _, version, err := store.WaitUntilFreshAndList(0, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -323,7 +323,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - _, version, err := store.WaitUntilFreshAndList(10) + _, version, err := store.WaitUntilFreshAndList(10, nil) if err != nil { t.Fatalf("unexpected error: %v", err) }