|
|
|
@ -223,6 +223,7 @@ func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan zookeeperTree
|
|
|
|
|
tc.head = &zookeeperTreeCacheNode{ |
|
|
|
|
events: make(chan zk.Event), |
|
|
|
|
children: map[string]*zookeeperTreeCacheNode{}, |
|
|
|
|
stopped: true, |
|
|
|
|
} |
|
|
|
|
err := tc.recursiveNodeUpdate(path, tc.head) |
|
|
|
|
if err != nil { |
|
|
|
@ -279,16 +280,20 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) {
|
|
|
|
|
if err != nil { |
|
|
|
|
log.Errorf("Error during processing of Zookeeper event: %s", err) |
|
|
|
|
failure() |
|
|
|
|
} else if tc.head.data == nil { |
|
|
|
|
log.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix) |
|
|
|
|
failure() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
case <-retryChan: |
|
|
|
|
log.Infof("Attempting to resync state with Zookeeper") |
|
|
|
|
err := tc.recursiveNodeUpdate(tc.prefix, tc.head) |
|
|
|
|
if err == nil { |
|
|
|
|
failureMode = false |
|
|
|
|
} else { |
|
|
|
|
if err != nil { |
|
|
|
|
log.Errorf("Error during Zookeeper resync: %s", err) |
|
|
|
|
failure() |
|
|
|
|
} else { |
|
|
|
|
log.Infof("Zookeeper resync successful") |
|
|
|
|
failureMode = false |
|
|
|
|
} |
|
|
|
|
case <-tc.stop: |
|
|
|
|
close(tc.events) |
|
|
|
@ -301,6 +306,9 @@ func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
|
|
|
|
data, _, dataWatcher, err := tc.conn.GetW(path) |
|
|
|
|
if err == zk.ErrNoNode { |
|
|
|
|
tc.recursiveDelete(path, node) |
|
|
|
|
if node == tc.head { |
|
|
|
|
return fmt.Errorf("path %s does not exist", path) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} else if err != nil { |
|
|
|
|
return err |
|
|
|
|