Merge pull request #76196 from kubernetes/revert-75547-improve_the_efficiency_of_delivery_watch_events

Revert "delivery event non blocking firstly"
k3s-v1.15.3
Kubernetes Prow Robot 2019-04-05 10:21:44 -07:00 committed by GitHub
commit f1693efe37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 59 deletions

View File

@ -224,8 +224,6 @@ type Cacher struct {
// watchersBuffer is a list of watchers potentially interested in currently
// dispatched event.
watchersBuffer []*cacheWatcher
// blockedWatchers is a list of watchers whose buffer is currently full.
blockedWatchers []*cacheWatcher
// watchersToStop is a list of watchers that were supposed to be stopped
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
@ -690,39 +688,8 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}
if len(c.blockedWatchers) > 0 {
// dispatchEvent is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := c.dispatchTimeoutBudget.takeAvailable()
c.timer.Reset(timeout)
// Make sure every watcher will try to send event without blocking first,
// even if the timer has already expired.
timer := c.timer
for _, watcher := range c.blockedWatchers {
if !watcher.add(event, timer) {
// No time left, clean the timer by set it to nil.
timer = nil
}
}
if !c.timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-c.timer.C
}
c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
}
c.finishDispatching()
@ -973,23 +940,30 @@ func (c *cacheWatcher) stop() {
}
}
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Try to send the event immediately, without blocking.
select {
case c.input <- event:
return true
return
default:
return false
}
}
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return true
}
closeFunc := func() {
// OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := budget.takeAvailable()
timer.Reset(timeout)
select {
case c.input <- event:
if !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-timer.C
}
case <-timer.C:
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
@ -997,19 +971,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
c.forget()
}
if timer == nil {
closeFunc()
return false
}
// OK, block sending, but only until timer fires.
select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
budget.returnUnused(timeout - time.Since(startTime))
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!