Merge pull request #32275 from wojtek-t/split_process_event

Automatic merge from submit-queue

Split dispatching to watchers in Cacher into separate goroutine.

Should help with #32257
pull/6/head
Kubernetes Submit Queue 2016-09-08 07:42:12 -07:00 committed by GitHub
commit 504ccc6f37
1 changed files with 28 additions and 10 deletions

View File

@ -161,6 +161,9 @@ type Cacher struct {
watcherIdx int
watchers indexedWatchers
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
@ -197,6 +200,8 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
@ -205,6 +210,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
stopCh: make(chan struct{}),
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
stopCh := cacher.stopCh
cacher.stopWg.Add(1)
@ -403,14 +409,26 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
return result, len(result) > 0
}
// TODO: Most probably splitting this method to a separate thread will visibily
// improve throughput of our watch machinery. So what we should do is to:
// - OnEvent handler simply put an element to channel
// - processEvent be another goroutine processing events from that channel
// Additionally, if we make this channel buffered, cacher will be more resistant
// to single watchers being slow - see cacheWatcher::add method.
func (c *Cacher) processEvent(event watchCacheEvent) {
triggerValues, supported := c.triggerValues(&event)
c.incoming <- event
}
func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
c.Lock()
defer c.Unlock()
@ -614,10 +632,10 @@ func (c *cacheWatcher) stop() {
var timerPool sync.Pool
func (c *cacheWatcher) add(event watchCacheEvent) {
func (c *cacheWatcher) add(event *watchCacheEvent) {
// Try to send the event immediately, without blocking.
select {
case c.input <- event:
case c.input <- *event:
return
default:
}
@ -636,7 +654,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
defer timerPool.Put(t)
select {
case c.input <- event:
case c.input <- *event:
stopped := t.Stop()
if !stopped {
// Consume triggered (but not yet received) timer event