mirror of https://github.com/k3s-io/k3s
Merge pull request #76702 from wojtek-t/reduce_watchcache_contention
Reduce contention in watchcache by not calling event handler under lockk3s-v1.15.3
commit
9a1572b70a
|
@ -291,11 +291,6 @@ type Cacher struct {
|
|||
// given configuration.
|
||||
func NewCacherFromConfig(config Config) *Cacher {
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
|
||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||
|
||||
obj := config.NewFunc()
|
||||
// Give this error when it is constructed rather than when you get the
|
||||
// first watch item, because it's much easier to track down that way.
|
||||
|
@ -303,18 +298,11 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||
panic("storage codec doesn't seem to match given type: " + err.Error())
|
||||
}
|
||||
|
||||
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
||||
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
|
||||
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||
reflector.WatchListPageSize = storageWatchListPageSize
|
||||
|
||||
clock := clock.RealClock{}
|
||||
cacher := &Cacher{
|
||||
ready: newReady(),
|
||||
storage: config.Storage,
|
||||
objectType: reflect.TypeOf(obj),
|
||||
watchCache: watchCache,
|
||||
reflector: reflector,
|
||||
versioner: config.Versioner,
|
||||
newFunc: config.NewFunc,
|
||||
triggerFunc: config.TriggerPublisherFunc,
|
||||
|
@ -337,7 +325,27 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||
bookmarkWatchers: newTimeBucketWatchers(clock),
|
||||
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
|
||||
}
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
|
||||
// Ensure that timer is stopped.
|
||||
if !cacher.timer.Stop() {
|
||||
// Consume triggered (but not yet received) timer event
|
||||
// so that future reuse does not get a spurious timeout.
|
||||
<-cacher.timer.C
|
||||
}
|
||||
|
||||
watchCache := newWatchCache(
|
||||
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
|
||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||
|
||||
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
||||
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
|
||||
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||
reflector.WatchListPageSize = storageWatchListPageSize
|
||||
|
||||
cacher.watchCache = watchCache
|
||||
cacher.reflector = reflector
|
||||
|
||||
go cacher.dispatchEvents()
|
||||
|
||||
cacher.stopWg.Add(1)
|
||||
|
@ -352,13 +360,6 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||
)
|
||||
}()
|
||||
|
||||
// Ensure that timer is stopped.
|
||||
if !cacher.timer.Stop() {
|
||||
// Consume triggered (but not yet received) timer event
|
||||
// so that future reuse does not get a spurious timeout.
|
||||
<-cacher.timer.C
|
||||
}
|
||||
|
||||
return cacher
|
||||
}
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ type watchCache struct {
|
|||
|
||||
// This handler is run at the end of every Add/Update/Delete method
|
||||
// and additionally gets the previous value of the object.
|
||||
onEvent func(*watchCacheEvent)
|
||||
eventHandler func(*watchCacheEvent)
|
||||
|
||||
// for testing timeouts.
|
||||
clock clock.Clock
|
||||
|
@ -137,6 +137,7 @@ type watchCache struct {
|
|||
func newWatchCache(
|
||||
capacity int,
|
||||
keyFunc func(runtime.Object) (string, error),
|
||||
eventHandler func(*watchCacheEvent),
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
||||
versioner storage.Versioner) *watchCache {
|
||||
wc := &watchCache{
|
||||
|
@ -149,6 +150,7 @@ func newWatchCache(
|
|||
store: cache.NewStore(storeElementKey),
|
||||
resourceVersion: 0,
|
||||
listResourceVersion: 0,
|
||||
eventHandler: eventHandler,
|
||||
clock: clock.RealClock{},
|
||||
versioner: versioner,
|
||||
}
|
||||
|
@ -204,6 +206,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
|
|||
return object, resourceVersion, nil
|
||||
}
|
||||
|
||||
// processEvent is safe as long as there is at most one call to it in flight
|
||||
// at any point in time.
|
||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||
key, err := w.keyFunc(event.Object)
|
||||
if err != nil {
|
||||
|
@ -224,12 +228,14 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
ResourceVersion: resourceVersion,
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
// TODO: We should consider moving this lock below after the watchCacheEvent
|
||||
// is created. In such situation, the only problematic scenario is Replace(
|
||||
// happening after getting object from store and before acquiring a lock.
|
||||
// Maybe introduce another lock for this purpose.
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
previous, exists, err := w.store.Get(elem)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -240,14 +246,23 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
watchCacheEvent.PrevObjLabels = previousElem.Labels
|
||||
watchCacheEvent.PrevObjFields = previousElem.Fields
|
||||
}
|
||||
|
||||
w.updateCache(watchCacheEvent)
|
||||
w.resourceVersion = resourceVersion
|
||||
defer w.cond.Broadcast()
|
||||
|
||||
if w.onEvent != nil {
|
||||
w.onEvent(watchCacheEvent)
|
||||
}
|
||||
w.cond.Broadcast()
|
||||
return updateFunc(elem)
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Avoid calling event handler under lock.
|
||||
// This is safe as long as there is at most one call to processEvent in flight
|
||||
// at any point in time.
|
||||
if w.eventHandler != nil {
|
||||
w.eventHandler(watchCacheEvent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Assumes that lock is already held for write.
|
||||
|
@ -397,12 +412,6 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
|
|||
w.onReplace = onReplace
|
||||
}
|
||||
|
||||
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.onEvent = onEvent
|
||||
}
|
||||
|
||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
||||
size := w.endIndex - w.startIndex
|
||||
var oldest uint64
|
||||
|
|
|
@ -76,7 +76,8 @@ func newTestWatchCache(capacity int) *watchCache {
|
|||
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
|
||||
}
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
|
||||
mockHandler := func(*watchCacheEvent) {}
|
||||
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner)
|
||||
wc.clock = clock.NewFakeClock(time.Now())
|
||||
return wc
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue