diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index ee2785721..019af105f 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -108,8 +108,9 @@ type Discovery struct { sources map[string]*targetgroup.Group - updates chan treecache.ZookeeperTreeCacheEvent - treeCaches []*treecache.ZookeeperTreeCache + updates chan treecache.ZookeeperTreeCacheEvent + pathUpdates []chan treecache.ZookeeperTreeCacheEvent + treeCaches []*treecache.ZookeeperTreeCache parse func(data []byte, path string) (model.LabelSet, error) logger log.Logger @@ -155,7 +156,9 @@ func NewDiscovery( logger: logger, } for _, path := range paths { - sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger)) + pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent) + sd.pathUpdates = append(sd.pathUpdates, pathUpdate) + sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger)) } return sd, nil } @@ -166,12 +169,26 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { for _, tc := range d.treeCaches { tc.Stop() } - // Drain event channel in case the treecache leaks goroutines otherwise. - for range d.updates { + for _, pathUpdate := range d.pathUpdates { + // Drain event channel in case the treecache leaks goroutines otherwise. + for range pathUpdate { + } } d.conn.Close() }() + for _, pathUpdate := range d.pathUpdates { + go func(update chan treecache.ZookeeperTreeCacheEvent) { + for event := range update { + select { + case d.updates <- event: + case <-ctx.Done(): + return + } + } + }(pathUpdate) + } + for { select { case <-ctx.Done():