From 373025539279d3892a1f4ba9384af9bf84c5b4ed Mon Sep 17 00:00:00 2001 From: "Ulfsparre, Tommy" Date: Fri, 9 Sep 2016 00:27:23 +0200 Subject: [PATCH] remove deleted zookeeper nodes --- util/treecache/treecache.go | 44 +++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/util/treecache/treecache.go b/util/treecache/treecache.go index 2de4e073d..3e9691f34 100644 --- a/util/treecache/treecache.go +++ b/util/treecache/treecache.go @@ -19,10 +19,31 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/samuel/go-zookeeper/zk" ) +var ( + failureCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "treecache", + Name: "zookeeper_failures_total", + Help: "The total number of ZooKeeper failures.", + }) + numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "prometheus", + Subsystem: "treecache", + Name: "watcher_goroutines", + Help: "The current number of watcher goroutines.", + }) +) + +func init() { + prometheus.MustRegister(failureCounter) + prometheus.MustRegister(numWatchers) +} + type ZookeeperLogger struct { } @@ -81,6 +102,7 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) { retryChan := make(chan struct{}) failure := func() { + failureCounter.Inc() failureMode = true time.AfterFunc(time.Second*10, func() { retryChan <- struct{}{} @@ -129,13 +151,19 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) { } case <-retryChan: log.Infof("Attempting to resync state with Zookeeper") + previousState := &zookeeperTreeCacheNode{ + children: tc.head.children, + } // Reset root child nodes before traversing the Zookeeper path. tc.head.children = make(map[string]*zookeeperTreeCacheNode) - err := tc.recursiveNodeUpdate(tc.prefix, tc.head) - if err != nil { + + if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil { log.Errorf("Error during Zookeeper resync: %s", err) + // Revert to our previous state. + tc.head.children = previousState.children failure() } else { + tc.resyncState(tc.prefix, tc.head, previousState) log.Infof("Zookeeper resync successful") failureMode = false } @@ -199,6 +227,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr } go func() { + numWatchers.Inc() // Pass up zookeeper events, until the node is deleted. select { case event := <-dataWatcher: @@ -207,10 +236,21 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr node.events <- event case <-node.done: } + numWatchers.Dec() }() return nil } +func (tc *ZookeeperTreeCache) resyncState(path string, currentState, previousState *zookeeperTreeCacheNode) { + for child, previousNode := range previousState.children { + if currentNode, present := currentState.children[child]; present { + tc.resyncState(path+"/"+child, currentNode, previousNode) + } else { + tc.recursiveDelete(path+"/"+child, previousNode) + } + } +} + func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) { if !node.stopped { node.done <- struct{}{}