diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index f0ed90011d..dcd88cbc44 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -403,6 +403,12 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { return result, len(result) > 0 } +// TODO: Most probably splitting this method to a separate thread will visibily +// improve throughput of our watch machinery. So what we should do is to: +// - OnEvent handler simply put an element to channel +// - processEvent be another goroutine processing events from that channel +// Additionally, if we make this channel buffered, cacher will be more resistant +// to single watchers being slow - see cacheWatcher::add method. func (c *Cacher) processEvent(event watchCacheEvent) { triggerValues, supported := c.triggerValues(&event) @@ -619,6 +625,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) { // OK, block sending, but only for up to 5 seconds. // cacheWatcher.add is called very often, so arrange // to reuse timers instead of constantly allocating. + startTime := time.Now() const timeout = 5 * time.Second t, ok := timerPool.Get().(*time.Timer) if ok { @@ -643,6 +650,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) { c.forget(false) c.stop() } + glog.V(2).Infof("cacheWatcher add function blocked processing for %v", time.Since(startTime)) } func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 66251a9159..b9ea1b3aec 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -19,6 +19,7 @@ package etcd import ( "fmt" "net/http" + "reflect" "sync" "sync/atomic" "time" @@ -107,6 +108,10 @@ type etcdWatcher struct { // Injectable for testing. Send the event down the outgoing channel. emit func(watch.Event) + // HighWaterMarks for performance debugging. + incomingHWM HighWaterMark + outgoingHWM HighWaterMark + cache etcdCache } @@ -150,6 +155,10 @@ func newEtcdWatcher( cancel: nil, } w.emit = func(e watch.Event) { + if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) { + // Monitor if this gets backed up, and how much. + glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen) + } // Give up on user stop, without this we leak a lot of goroutines in tests. select { case w.outgoing <- e: @@ -262,10 +271,6 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming incoming <- &copied } -var ( - watchChannelHWM HighWaterMark -) - // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { @@ -308,9 +313,9 @@ func (w *etcdWatcher) translate() { return case res, ok := <-w.etcdIncoming: if ok { - if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) { + if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. - glog.V(2).Infof("watch: %v objects queued in channel.", curLen) + glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen) } w.sendResult(res) } diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 02bc31437d..0b3b39baee 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -190,6 +190,10 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { if res == nil { continue } + if len(wc.resultChan) == outgoingBufSize { + glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+ + "Probably caused by slow dispatching events to watchers", outgoingBufSize) + } // If user couldn't receive results fast enough, we also block incoming events from watcher. // Because storing events in local will cause more memory usage. // The worst case would be closing the fast watcher. @@ -300,7 +304,7 @@ func (wc *watchChan) sendError(err error) { func (wc *watchChan) sendEvent(e *event) { if len(wc.incomingEventChan) == incomingBufSize { - glog.V(2).Infof("Fast watcher, slow processing. Number of buffered events: %d."+ + glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+ "Probably caused by slow decoding, user not receiving fast, or other processing logic", incomingBufSize) }