mirror of https://github.com/k3s-io/k3s
Merge pull request #73846 from wojtek-t/avoid_sync_pool
Reduce contention in cacher by eliminating sync.Poolpull/564/head
commit
26af19b3a4
|
@ -186,6 +186,9 @@ type Cacher struct {
|
|||
stopped bool
|
||||
stopCh chan struct{}
|
||||
stopWg sync.WaitGroup
|
||||
|
||||
// Used to avoid unnecessary allocations in underlying watchers.
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
|
||||
|
@ -227,6 +230,7 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||
// and there are no guarantees on the order that they will stop.
|
||||
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
||||
stopCh: stopCh,
|
||||
timer: time.NewTimer(time.Duration(0)),
|
||||
}
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
go cacher.dispatchEvents()
|
||||
|
@ -242,6 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||
}, time.Second, stopCh,
|
||||
)
|
||||
}()
|
||||
|
||||
// Ensure that timer is stopped.
|
||||
if !cacher.timer.Stop() {
|
||||
// Consume triggered (but not yet received) timer event
|
||||
// so that future reuse does not get a spurious timeout.
|
||||
<-cacher.timer.C
|
||||
}
|
||||
|
||||
return cacher
|
||||
}
|
||||
|
||||
|
@ -621,13 +633,13 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||
defer c.Unlock()
|
||||
// Iterate over "allWatchers" no matter what the trigger function is.
|
||||
for _, watcher := range c.watchers.allWatchers {
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
|
||||
}
|
||||
if supported {
|
||||
// Iterate over watchers interested in the given values of the trigger.
|
||||
for _, triggerValue := range triggerValues {
|
||||
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -640,7 +652,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||
// Iterate over watchers interested in exact values for all values.
|
||||
for _, watchers := range c.watchers.valueWatchers {
|
||||
for _, watcher := range watchers {
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -826,9 +838,7 @@ func (c *cacheWatcher) stop() {
|
|||
}
|
||||
}
|
||||
|
||||
var timerPool sync.Pool
|
||||
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
|
||||
// Try to send the event immediately, without blocking.
|
||||
select {
|
||||
case c.input <- event:
|
||||
|
@ -842,23 +852,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
|
|||
startTime := time.Now()
|
||||
timeout := budget.takeAvailable()
|
||||
|
||||
t, ok := timerPool.Get().(*time.Timer)
|
||||
if ok {
|
||||
t.Reset(timeout)
|
||||
} else {
|
||||
t = time.NewTimer(timeout)
|
||||
}
|
||||
defer timerPool.Put(t)
|
||||
timer.Reset(timeout)
|
||||
|
||||
select {
|
||||
case c.input <- event:
|
||||
stopped := t.Stop()
|
||||
if !stopped {
|
||||
if !timer.Stop() {
|
||||
// Consume triggered (but not yet received) timer event
|
||||
// so that future reuse does not get a spurious timeout.
|
||||
<-t.C
|
||||
<-timer.C
|
||||
}
|
||||
case <-t.C:
|
||||
case <-timer.C:
|
||||
// This means that we couldn't send event to that watcher.
|
||||
// Since we don't want to block on it infinitely,
|
||||
// we simply terminate it.
|
||||
|
|
Loading…
Reference in New Issue