mirror of https://github.com/k3s-io/k3s
Merge pull request #35621 from wojtek-t/reduce_watch_cache_lock_contention
Automatic merge from submit-queue Reduce lock contention in watchCachepull/6/head
commit
6fd9acd3e1
|
@ -105,6 +105,7 @@ type watchCache struct {
|
||||||
// store will effectively support LIST operation from the "end of cache
|
// store will effectively support LIST operation from the "end of cache
|
||||||
// history" i.e. from the moment just after the newest cached watched event.
|
// history" i.e. from the moment just after the newest cached watched event.
|
||||||
// It is necessary to effectively allow clients to start watching at now.
|
// It is necessary to effectively allow clients to start watching at now.
|
||||||
|
// NOTE: We assume that <store> is thread-safe.
|
||||||
store cache.Store
|
store cache.Store
|
||||||
|
|
||||||
// ResourceVersion up to which the watchCache is propagated.
|
// ResourceVersion up to which the watchCache is propagated.
|
||||||
|
@ -202,6 +203,10 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||||
}
|
}
|
||||||
elem := &storeElement{Key: key, Object: event.Object}
|
elem := &storeElement{Key: key, Object: event.Object}
|
||||||
|
|
||||||
|
// TODO: We should consider moving this lock below after the watchCacheEvent
|
||||||
|
// is created. In such situation, the only problematic scenario is Replace(
|
||||||
|
// happening after getting object from store and before acquiring a lock.
|
||||||
|
// Maybe introduce another lock for this purpose.
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
previous, exists, err := w.store.Get(elem)
|
previous, exists, err := w.store.Get(elem)
|
||||||
|
@ -240,8 +245,6 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent)
|
||||||
|
|
||||||
// List returns list of pointers to <storeElement> objects.
|
// List returns list of pointers to <storeElement> objects.
|
||||||
func (w *watchCache) List() []interface{} {
|
func (w *watchCache) List() []interface{} {
|
||||||
w.RLock()
|
|
||||||
defer w.RUnlock()
|
|
||||||
return w.store.List()
|
return w.store.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,8 +303,6 @@ func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, tr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) ListKeys() []string {
|
func (w *watchCache) ListKeys() []string {
|
||||||
w.RLock()
|
|
||||||
defer w.RUnlock()
|
|
||||||
return w.store.ListKeys()
|
return w.store.ListKeys()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -317,15 +318,11 @@ func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||||
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
|
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.RLock()
|
|
||||||
defer w.RUnlock()
|
|
||||||
return w.store.Get(&storeElement{Key: key, Object: object})
|
return w.store.Get(&storeElement{Key: key, Object: object})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetByKey returns pointer to <storeElement>.
|
// GetByKey returns pointer to <storeElement>.
|
||||||
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
||||||
w.RLock()
|
|
||||||
defer w.RUnlock()
|
|
||||||
return w.store.GetByKey(key)
|
return w.store.GetByKey(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue