Reduce lock contention in watchcache.

pull/564/head
wojtekt 2019-02-12 14:26:45 +01:00
parent 3c2a4f0362
commit 9452adc5c8
2 changed files with 95 additions and 42 deletions

View File

@ -98,14 +98,17 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
}
func (wm watchersMap) terminateAll() {
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
for key, watcher := range wm {
delete(wm, key)
watcher.stop()
done(watcher)
}
}
@ -125,24 +128,24 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string,
}
}
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) {
if supported {
i.valueWatchers[value].deleteWatcher(number)
i.valueWatchers[value].deleteWatcher(number, done)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers.deleteWatcher(number)
i.allWatchers.deleteWatcher(number, done)
}
}
func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
klog.Warningf("Terminating all watchers from cacher %v", objectType)
}
i.allWatchers.terminateAll()
i.allWatchers.terminateAll(done)
for index, watchers := range i.valueWatchers {
watchers.terminateAll()
watchers.terminateAll(done)
delete(i.valueWatchers, index)
}
}
@ -203,8 +206,19 @@ type Cacher struct {
stopCh chan struct{}
stopWg sync.WaitGroup
// Used to avoid unnecessary allocations in underlying watchers.
// timer is used to avoid unnecessary allocations in underlying watchers.
timer *time.Timer
// dispatching determines whether there is currently dispatching of
// any event in flight.
dispatching bool
// watchersBuffer is a list of watchers potentially interested in currently
// dispatched event.
watchersBuffer []*cacheWatcher
// watchersToStop is a list of watchers that were supposed to be stopped
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
watchersToStop []*cacheWatcher
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@ -650,19 +664,41 @@ func (c *Cacher) dispatchEvents() {
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
// Since add() can block, we explicitly add when cacher is unlocked.
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
}
c.finishDispatching()
}
// startDispatching chooses watchers potentially interested in a given event
// a marks dispatching as true.
func (c *Cacher) startDispatching(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
c.Lock()
defer c.Unlock()
c.dispatching = true
// We are reusing the slice to avoid memory reallocations in every
// dispatchEvent() call. That may prevent Go GC from freeing items
// from previous phases that are sitting behind the current length
// of the slice, but there is only a limited number of those and the
// gain from avoiding memory allocations is much bigger.
c.watchersBuffer = c.watchersBuffer[:0]
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
}
} else {
@ -675,16 +711,38 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
}
}
}
// finishDispatching stops all the watchers that were supposed to be
// stopped in the meantime, but it was deferred to avoid closing input
// channels of watchers, as add() may still have writing to it.
// It also marks dispatching as false.
func (c *Cacher) finishDispatching() {
c.Lock()
defer c.Unlock()
c.dispatching = false
for _, watcher := range c.watchersToStop {
watcher.stop()
}
c.watchersToStop = c.watchersToStop[:0]
}
func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
c.watchers.terminateAll(c.objectType)
c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe)
}
func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
if c.dispatching {
c.watchersToStop = append(c.watchersToStop, watcher)
} else {
watcher.stop()
}
}
func (c *Cacher) isStopped() bool {
@ -710,20 +768,15 @@ func (c *Cacher) Stop() {
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
} else {
// false is currently passed only if we are forcing watcher to close due
// to its unresponsiveness and blocking other watchers.
// TODO: Get this information in cleaner way.
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
}
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func() {
return func() {
c.Lock()
defer c.Unlock()
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
}
}
@ -822,11 +875,11 @@ type cacheWatcher struct {
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func(bool)
forget func()
versioner storage.Versioner
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
@ -847,8 +900,7 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event {
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget(true)
c.stop()
c.forget()
}
func (c *cacheWatcher) stop() {
@ -888,8 +940,8 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
c.forget(false)
c.stop()
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
c.forget()
}
budget.returnUnused(timeout - time.Since(startTime))
@ -982,11 +1034,7 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui
defer close(c.result)
defer c.Stop()
for {
event, ok := <-c.input
if !ok {
return
}
for event := range c.input {
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)

View File

@ -45,12 +45,17 @@ import (
// the writes to cacheWatcher.result channel is blocked.
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
forget := func(bool) {
forget := func() {
lock.Lock()
defer lock.Unlock()
count++
// forget() has to stop the watcher, as only stopping the watcher
// triggers stopping the process() goroutine which we are in the
// end waiting for in this test.
w.stop()
}
initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}},
@ -58,7 +63,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w = newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
@ -73,7 +78,7 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
filter := func(_ string, _ labels.Set, field fields.Set) bool {
return field["spec.nodeName"] == "host"
}
forget := func(bool) {}
forget := func() {}
testCases := []struct {
events []*watchCacheEvent