diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index a2a5995db4..0316d7a2c2 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -45,6 +45,11 @@ type CacherConfig struct { // An underlying storage.Versioner. Versioner Versioner + // Whether to serve Lists from in-memory cache. + // + // NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE! + ListFromCache bool + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. Type interface{} @@ -99,6 +104,11 @@ type Cacher struct { // keyFunc is used to get a key in the underyling storage for a given object. keyFunc func(runtime.Object) (string, error) + + // Whether to serve Lists from in-memory cache. + // + // NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE! + ListFromCache bool } // Create a new Cacher responsible from service WATCH and LIST requests from its @@ -109,14 +119,15 @@ func NewCacher(config CacherConfig) *Cacher { listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) cacher := &Cacher{ - usable: sync.RWMutex{}, - storage: config.Storage, - watchCache: watchCache, - reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), - watcherIdx: 0, - watchers: make(map[int]*cacheWatcher), - versioner: config.Versioner, - keyFunc: config.KeyFunc, + usable: sync.RWMutex{}, + storage: config.Storage, + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + watcherIdx: 0, + watchers: make(map[int]*cacheWatcher), + versioner: config.Versioner, + keyFunc: config.KeyFunc, + ListFromCache: config.ListFromCache, } cacher.usable.Lock() // See startCaching method for why explanation on it. @@ -220,21 +231,19 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l // Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error { - return c.storage.List(ctx, key, resourceVersion, filter, listObj) -} + if !c.ListFromCache { + return c.storage.List(ctx, key, resourceVersion, filter, listObj) + } -// ListFromMemory implements list operation (the same signature as List method) -// but it serves the contents from memory. -// Current we cannot use ListFromMemory() instead of List(), because it only -// guarantees eventual consistency (e.g. it's possible for Get called right after -// Create to return not-exist, before the change is propagate). -// TODO: We may consider changing to use ListFromMemory in the future, but this -// requires wider discussion as an "api semantic change". -func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error { - // Do NOT allow Watch to start when the underlying structures are not propagated. + // To avoid situation when List is proceesed before the underlying + // watchCache is propagated for the first time, we acquire and immediately + // release the 'usable' lock. + // We don't need to hold it all the time, because watchCache is thread-safe + // and it would complicate already very difficult locking pattern. c.usable.RLock() - defer c.usable.RUnlock() + c.usable.RUnlock() + // List elements from cache, with at least 'resourceVersion'. listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err @@ -243,15 +252,15 @@ func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error { if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterFunction(key, c.keyFunc, Everything) + filterFunc := filterFunction(key, c.keyFunc, filter) - objs, resourceVersion := c.watchCache.ListWithVersion() + objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion) for _, obj := range objs { object, ok := obj.(runtime.Object) if !ok { return fmt.Errorf("non runtime.Object returned from storage: %v", obj) } - if filter(object) { + if filterFunc(object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) } } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 6e8bdc82ad..1bbbb7dd0a 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "golang.org/x/net/context" @@ -47,8 +46,9 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher { prefix := "pods" config := storage.CacherConfig{ CacheCapacity: 10, - Versioner: etcdstorage.APIObjectVersioner{}, Storage: etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()), + Versioner: etcdstorage.APIObjectVersioner{}, + ListFromCache: true, Type: &api.Pod{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, @@ -65,18 +65,7 @@ func makeTestPod(name string) *api.Pod { } } -func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error { - ready := func() (bool, error) { - result, err := cacher.LastSyncResourceVersion() - if err != nil { - return false, err - } - return result == resourceVersion, nil - } - return wait.Poll(10*time.Millisecond, util.ForeverTestTimeout, ready) -} - -func TestListFromMemory(t *testing.T) { +func TestList(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) prefixedKey := etcdtest.AddPrefix("pods") fakeClient.ExpectNotFoundGet(prefixedKey) @@ -146,12 +135,9 @@ func TestListFromMemory(t *testing.T) { for _, test := range testCases { fakeClient.WatchResponse <- test } - if err := waitForUpToDateCache(cacher, 5); err != nil { - t.Errorf("watch cache didn't propagated correctly: %v", err) - } result := &api.PodList{} - if err := cacher.ListFromMemory("pods/ns", result); err != nil { + if err := cacher.List(context.TODO(), "pods/ns", 5, storage.Everything, result); err != nil { t.Errorf("unexpected error: %v", err) } if result.ListMeta.ResourceVersion != "5" { diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 1b376feec1..afcd0f2c12 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -55,6 +55,10 @@ type watchCacheElement struct { type watchCache struct { sync.RWMutex + // Condition on which lists are waiting for the fresh enough + // resource version. + cond *sync.Cond + // Maximum size of history window. capacity int @@ -84,7 +88,7 @@ type watchCache struct { } func newWatchCache(capacity int) *watchCache { - return &watchCache{ + wc := &watchCache{ capacity: capacity, cache: make([]watchCacheElement, capacity), startIndex: 0, @@ -92,6 +96,8 @@ func newWatchCache(capacity int) *watchCache { store: cache.NewStore(cache.MetaNamespaceKeyFunc), resourceVersion: 0, } + wc.cond = sync.NewCond(wc.RLocker()) + return wc } func (w *watchCache) Add(obj interface{}) error { @@ -169,6 +175,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd } w.updateCache(resourceVersion, watchCacheEvent) w.resourceVersion = resourceVersion + w.cond.Broadcast() return updateFunc(event.Object) } @@ -188,8 +195,11 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -func (w *watchCache) ListWithVersion() ([]interface{}, uint64) { +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) { w.RLock() + for w.resourceVersion < resourceVersion { + w.cond.Wait() + } defer w.RUnlock() return w.store.List(), w.resourceVersion } @@ -230,6 +240,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { if w.onReplace != nil { w.onReplace() } + w.cond.Broadcast() return nil } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index d462e67a60..bfa75f2f20 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -230,6 +230,24 @@ func TestEvents(t *testing.T) { } } +func TestWaitUntilFreshAndList(t *testing.T) { + store := newWatchCache(3) + + // In background, update the store. + go func() { + store.Add(makeTestPod("foo", 2)) + store.Add(makeTestPod("bar", 5)) + }() + + list, resourceVersion := store.WaitUntilFreshAndList(4) + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if len(list) != 2 { + t.Errorf("unexpected list returned: %#v", list) + } +} + type testLW struct { ListFunc func() (runtime.Object, error) WatchFunc func(options api.ListOptions) (watch.Interface, error) @@ -244,7 +262,7 @@ func TestReflectorForWatchCache(t *testing.T) { store := newWatchCache(5) { - _, version := store.ListWithVersion() + _, version := store.WaitUntilFreshAndList(0) if version != 0 { t.Errorf("unexpected resource version: %d", version) } @@ -264,7 +282,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(util.NeverStop) { - _, version := store.ListWithVersion() + _, version := store.WaitUntilFreshAndList(10) if version != 10 { t.Errorf("unexpected resource version: %d", version) }