From cea9d65a2c9add769b4f5c0473a1ba3ae6ce0d8b Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 5 Apr 2019 15:36:08 +0200 Subject: [PATCH] Revert "delivery event non blocking firstly" --- .../apiserver/pkg/storage/cacher/cacher.go | 80 +++++-------------- 1 file changed, 21 insertions(+), 59 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 71507d9887..d538531835 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -224,8 +224,6 @@ type Cacher struct { // watchersBuffer is a list of watchers potentially interested in currently // dispatched event. watchersBuffer []*cacheWatcher - // blockedWatchers is a list of watchers whose buffer is currently full. - blockedWatchers []*cacheWatcher // watchersToStop is a list of watchers that were supposed to be stopped // during current dispatching, but stopping was deferred to the end of // dispatching that event to avoid race with closing channels in watchers. @@ -690,39 +688,8 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { c.startDispatching(event) // Since add() can block, we explicitly add when cacher is unlocked. - - // Dispatching event in nonblocking way first, which make faster watchers - // not be blocked by slower ones. - c.blockedWatchers = c.blockedWatchers[:0] for _, watcher := range c.watchersBuffer { - if !watcher.nonblockingAdd(event) { - c.blockedWatchers = append(c.blockedWatchers, watcher) - } - } - - if len(c.blockedWatchers) > 0 { - // dispatchEvent is called very often, so arrange - // to reuse timers instead of constantly allocating. - startTime := time.Now() - timeout := c.dispatchTimeoutBudget.takeAvailable() - c.timer.Reset(timeout) - - // Make sure every watcher will try to send event without blocking first, - // even if the timer has already expired. - timer := c.timer - for _, watcher := range c.blockedWatchers { - if !watcher.add(event, timer) { - // No time left, clean the timer by set it to nil. - timer = nil - } - } - if !c.timer.Stop() { - // Consume triggered (but not yet received) timer event - // so that future reuse does not get a spurious timeout. - <-c.timer.C - } - - c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime)) + watcher.add(event, c.timer, c.dispatchTimeoutBudget) } c.finishDispatching() @@ -973,23 +940,30 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { + // Try to send the event immediately, without blocking. select { case c.input <- event: - return true + return default: - return false - } -} - -// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) -func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { - // Try to send the event immediately, without blocking. - if c.nonblockingAdd(event) { - return true } - closeFunc := func() { + // OK, block sending, but only for up to . + // cacheWatcher.add is called very often, so arrange + // to reuse timers instead of constantly allocating. + startTime := time.Now() + timeout := budget.takeAvailable() + + timer.Reset(timeout) + + select { + case c.input <- event: + if !timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-timer.C + } + case <-timer.C: // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. @@ -997,19 +971,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { c.forget() } - if timer == nil { - closeFunc() - return false - } - - // OK, block sending, but only until timer fires. - select { - case c.input <- event: - return true - case <-timer.C: - closeFunc() - return false - } + budget.returnUnused(timeout - time.Since(startTime)) } // NOTE: sendWatchCacheEvent is assumed to not modify !!!