From e6e43828284c6e83cf2b4658fa377b4cc7dec0c3 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 17 Apr 2019 12:53:36 +0200 Subject: [PATCH] Reduce contention in watchcache by not calling event handler under lock --- .../apiserver/pkg/storage/cacher/cacher.go | 41 ++++++------ .../pkg/storage/cacher/watch_cache.go | 63 +++++++++++-------- .../pkg/storage/cacher/watch_cache_test.go | 3 +- 3 files changed, 59 insertions(+), 48 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index cb1e11437a..c85f051e18 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -291,11 +291,6 @@ type Cacher struct { // given configuration. func NewCacherFromConfig(config Config) *Cacher { stopCh := make(chan struct{}) - - watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) - listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) - reflectorName := "storage/cacher.go:" + config.ResourcePrefix - obj := config.NewFunc() // Give this error when it is constructed rather than when you get the // first watch item, because it's much easier to track down that way. @@ -303,18 +298,11 @@ func NewCacherFromConfig(config Config) *Cacher { panic("storage codec doesn't seem to match given type: " + err.Error()) } - reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) - // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from - // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. - reflector.WatchListPageSize = storageWatchListPageSize - clock := clock.RealClock{} cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(obj), - watchCache: watchCache, - reflector: reflector, versioner: config.Versioner, newFunc: config.NewFunc, triggerFunc: config.TriggerPublisherFunc, @@ -337,7 +325,27 @@ func NewCacherFromConfig(config Config) *Cacher { bookmarkWatchers: newTimeBucketWatchers(clock), watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark), } - watchCache.SetOnEvent(cacher.processEvent) + + // Ensure that timer is stopped. + if !cacher.timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-cacher.timer.C + } + + watchCache := newWatchCache( + config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner) + listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + reflectorName := "storage/cacher.go:" + config.ResourcePrefix + + reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) + // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from + // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. + reflector.WatchListPageSize = storageWatchListPageSize + + cacher.watchCache = watchCache + cacher.reflector = reflector + go cacher.dispatchEvents() cacher.stopWg.Add(1) @@ -352,13 +360,6 @@ func NewCacherFromConfig(config Config) *Cacher { ) }() - // Ensure that timer is stopped. - if !cacher.timer.Stop() { - // Consume triggered (but not yet received) timer event - // so that future reuse does not get a spurious timeout. - <-cacher.timer.C - } - return cacher } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 97efdb98cf..332aacd98d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -125,7 +125,7 @@ type watchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. - onEvent func(*watchCacheEvent) + eventHandler func(*watchCacheEvent) // for testing timeouts. clock clock.Clock @@ -137,6 +137,7 @@ type watchCache struct { func newWatchCache( capacity int, keyFunc func(runtime.Object) (string, error), + eventHandler func(*watchCacheEvent), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), versioner storage.Versioner) *watchCache { wc := &watchCache{ @@ -149,6 +150,7 @@ func newWatchCache( store: cache.NewStore(storeElementKey), resourceVersion: 0, listResourceVersion: 0, + eventHandler: eventHandler, clock: clock.RealClock{}, versioner: versioner, } @@ -204,6 +206,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob return object, resourceVersion, nil } +// processEvent is safe as long as there is at most one call to it in flight +// at any point in time. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { key, err := w.keyFunc(event.Object) if err != nil { @@ -224,30 +228,41 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd ResourceVersion: resourceVersion, } - // TODO: We should consider moving this lock below after the watchCacheEvent - // is created. In such situation, the only problematic scenario is Replace( - // happening after getting object from store and before acquiring a lock. - // Maybe introduce another lock for this purpose. - w.Lock() - defer w.Unlock() - previous, exists, err := w.store.Get(elem) - if err != nil { + if err := func() error { + // TODO: We should consider moving this lock below after the watchCacheEvent + // is created. In such situation, the only problematic scenario is Replace( + // happening after getting object from store and before acquiring a lock. + // Maybe introduce another lock for this purpose. + w.Lock() + defer w.Unlock() + + previous, exists, err := w.store.Get(elem) + if err != nil { + return err + } + if exists { + previousElem := previous.(*storeElement) + watchCacheEvent.PrevObject = previousElem.Object + watchCacheEvent.PrevObjLabels = previousElem.Labels + watchCacheEvent.PrevObjFields = previousElem.Fields + } + + w.updateCache(watchCacheEvent) + w.resourceVersion = resourceVersion + defer w.cond.Broadcast() + + return updateFunc(elem) + }(); err != nil { return err } - if exists { - previousElem := previous.(*storeElement) - watchCacheEvent.PrevObject = previousElem.Object - watchCacheEvent.PrevObjLabels = previousElem.Labels - watchCacheEvent.PrevObjFields = previousElem.Fields - } - w.updateCache(watchCacheEvent) - w.resourceVersion = resourceVersion - if w.onEvent != nil { - w.onEvent(watchCacheEvent) + // Avoid calling event handler under lock. + // This is safe as long as there is at most one call to processEvent in flight + // at any point in time. + if w.eventHandler != nil { + w.eventHandler(watchCacheEvent) } - w.cond.Broadcast() - return updateFunc(elem) + return nil } // Assumes that lock is already held for write. @@ -397,12 +412,6 @@ func (w *watchCache) SetOnReplace(onReplace func()) { w.onReplace = onReplace } -func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) { - w.Lock() - defer w.Unlock() - w.onEvent = onEvent -} - func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { size := w.endIndex - w.startIndex var oldest uint64 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index b7bfa0bd54..fb265ba929 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -76,7 +76,8 @@ func newTestWatchCache(capacity int) *watchCache { return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil } versioner := etcd.APIObjectVersioner{} - wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner) + mockHandler := func(*watchCacheEvent) {} + wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner) wc.clock = clock.NewFakeClock(time.Now()) return wc }