Reduce Pool contention in cacher

pull/564/head
wojtekt 2019-02-06 10:44:57 +01:00
parent 999e2e0ce8
commit c121632360
1 changed files with 20 additions and 17 deletions

View File

@ -186,6 +186,9 @@ type Cacher struct {
stopped bool stopped bool
stopCh chan struct{} stopCh chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
// Used to avoid unnecessary allocations in underlying watchers.
timer *time.Timer
} }
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@ -227,6 +230,7 @@ func NewCacherFromConfig(config Config) *Cacher {
// and there are no guarantees on the order that they will stop. // and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup. // So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh, stopCh: stopCh,
timer: time.NewTimer(time.Duration(0)),
} }
watchCache.SetOnEvent(cacher.processEvent) watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents() go cacher.dispatchEvents()
@ -242,6 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher {
}, time.Second, stopCh, }, time.Second, stopCh,
) )
}() }()
// Ensure that timer is stopped.
if !cacher.timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}
return cacher return cacher
} }
@ -621,13 +633,13 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
defer c.Unlock() defer c.Unlock()
// Iterate over "allWatchers" no matter what the trigger function is. // Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers { for _, watcher := range c.watchers.allWatchers {
watcher.add(event, c.dispatchTimeoutBudget) watcher.add(event, c.timer, c.dispatchTimeoutBudget)
} }
if supported { if supported {
// Iterate over watchers interested in the given values of the trigger. // Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues { for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] { for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, c.dispatchTimeoutBudget) watcher.add(event, c.timer, c.dispatchTimeoutBudget)
} }
} }
} else { } else {
@ -640,7 +652,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Iterate over watchers interested in exact values for all values. // Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers { for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers { for _, watcher := range watchers {
watcher.add(event, c.dispatchTimeoutBudget) watcher.add(event, c.timer, c.dispatchTimeoutBudget)
} }
} }
} }
@ -826,9 +838,7 @@ func (c *cacheWatcher) stop() {
} }
} }
var timerPool sync.Pool func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
// Try to send the event immediately, without blocking. // Try to send the event immediately, without blocking.
select { select {
case c.input <- event: case c.input <- event:
@ -842,23 +852,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
startTime := time.Now() startTime := time.Now()
timeout := budget.takeAvailable() timeout := budget.takeAvailable()
t, ok := timerPool.Get().(*time.Timer) timer.Reset(timeout)
if ok {
t.Reset(timeout)
} else {
t = time.NewTimer(timeout)
}
defer timerPool.Put(t)
select { select {
case c.input <- event: case c.input <- event:
stopped := t.Stop() if !timer.Stop() {
if !stopped {
// Consume triggered (but not yet received) timer event // Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout. // so that future reuse does not get a spurious timeout.
<-t.C <-timer.C
} }
case <-t.C: case <-timer.C:
// This means that we couldn't send event to that watcher. // This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely, // Since we don't want to block on it infinitely,
// we simply terminate it. // we simply terminate it.