mirror of https://github.com/k3s-io/k3s
Add tracing to listing in Cacher
parent
609b9e5124
commit
4d5ac91f88
|
@ -367,7 +367,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||
return err
|
||||
}
|
||||
|
||||
trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
||||
defer trace.LogIfLong(250 * time.Millisecond)
|
||||
|
||||
c.ready.wait()
|
||||
trace.Step("Ready")
|
||||
|
||||
// List elements from cache, with at least 'listRV'.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
|
@ -380,10 +384,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||
}
|
||||
filter := filterFunction(key, c.keyFunc, pred)
|
||||
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
|
@ -393,6 +398,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
}
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
||||
return err
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
@ -206,7 +207,7 @@ func (w *watchCache) List() []interface{} {
|
|||
return w.store.List()
|
||||
}
|
||||
|
||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
|
||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
|
||||
startTime := w.clock.Now()
|
||||
go func() {
|
||||
// Wake us up when the time limit has expired. The docs
|
||||
|
@ -228,6 +229,9 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{
|
|||
}
|
||||
w.cond.Wait()
|
||||
}
|
||||
if trace != nil {
|
||||
trace.Step("Cache is fresh enough")
|
||||
}
|
||||
return w.store.List(), w.resourceVersion, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -248,7 +248,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||
store.Add(makeTestPod("bar", 5))
|
||||
}()
|
||||
|
||||
list, resourceVersion, err := store.WaitUntilFreshAndList(5)
|
||||
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -278,7 +278,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
|
|||
store.Add(makeTestPod("bar", 5))
|
||||
}()
|
||||
|
||||
_, _, err := store.WaitUntilFreshAndList(5)
|
||||
_, _, err := store.WaitUntilFreshAndList(5, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected lack of timeout error")
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||
store := newTestWatchCache(5)
|
||||
|
||||
{
|
||||
_, version, err := store.WaitUntilFreshAndList(0)
|
||||
_, version, err := store.WaitUntilFreshAndList(0, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -323,7 +323,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||
r.ListAndWatch(wait.NeverStop)
|
||||
|
||||
{
|
||||
_, version, err := store.WaitUntilFreshAndList(10)
|
||||
_, version, err := store.WaitUntilFreshAndList(10, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue