From b4d7ce1370fbd6af0786d28882f13fb5830b25a6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 18 Nov 2016 10:55:29 +0100 Subject: [PATCH 1/2] discovery: respect context cancellation everywhere This also removes closing of the target group channel everywhere as the contexts cancels across all stages and we don't care about draining all events once that happened. --- retrieval/discovery/azure.go | 6 +- retrieval/discovery/consul/consul.go | 13 +--- retrieval/discovery/dns/dns.go | 2 - retrieval/discovery/ec2.go | 17 +++-- retrieval/discovery/file.go | 68 ++++++++++---------- retrieval/discovery/file_test.go | 16 +++-- retrieval/discovery/gce.go | 14 ++-- retrieval/discovery/kubernetes/kubernetes.go | 2 - retrieval/discovery/marathon/marathon.go | 2 - 9 files changed, 70 insertions(+), 70 deletions(-) diff --git a/retrieval/discovery/azure.go b/retrieval/discovery/azure.go index 2153505a5..cec54af86 100644 --- a/retrieval/discovery/azure.go +++ b/retrieval/discovery/azure.go @@ -81,7 +81,6 @@ func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { // Run implements the TargetProvider interface. func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) ticker := time.NewTicker(ad.interval) defer ticker.Stop() @@ -96,7 +95,10 @@ func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGro if err != nil { log.Errorf("unable to refresh during Azure discovery: %s", err) } else { - ch <- []*config.TargetGroup{tg} + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } } select { diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 114a1ece4..e81749b1c 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -18,7 +18,6 @@ import ( "net" "strconv" "strings" - "sync" "time" consul "github.com/hashicorp/consul/api" @@ -133,12 +132,6 @@ func (cd *Discovery) shouldWatch(name string) bool { // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - var wg sync.WaitGroup - defer func() { - wg.Wait() - close(ch) - }() - // Watched services and their cancelation functions. services := map[string]func(){} @@ -204,11 +197,7 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } wctx, cancel := context.WithCancel(ctx) - wg.Add(1) - go func() { - srv.watch(wctx, ch) - wg.Done() - }() + srv.watch(wctx, ch) services[name] = cancel } diff --git a/retrieval/discovery/dns/dns.go b/retrieval/discovery/dns/dns.go index b0c9aca97..ca9ed56fc 100644 --- a/retrieval/discovery/dns/dns.go +++ b/retrieval/discovery/dns/dns.go @@ -90,8 +90,6 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { // Run implements the TargetProvider interface. func (dd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(dd.interval) defer ticker.Stop() diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 65aa7bac0..1d3d8e112 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -93,8 +93,6 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { // Run implements the TargetProvider interface. func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(ed.interval) defer ticker.Stop() @@ -103,7 +101,11 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } } for { @@ -112,8 +114,13 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := ed.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return } case <-ctx.Done(): return diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index ba7b9be91..ba1ae5b0c 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -109,7 +109,6 @@ func (fd *FileDiscovery) watchFiles() { // Run implements the TargetProvider interface. func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) defer fd.stop() watcher, err := fsnotify.NewWatcher() @@ -119,47 +118,40 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou } fd.watcher = watcher - fd.refresh(ch) + fd.refresh(ctx, ch) ticker := time.NewTicker(fd.interval) defer ticker.Stop() for { - // Stopping has priority over refreshing. Thus we wrap the actual select - // clause to always catch done signals. select { case <-ctx.Done(): return - default: - select { - case <-ctx.Done(): - return - case event := <-fd.watcher.Events: - // fsnotify sometimes sends a bunch of events without name or operation. - // It's unclear what they are and why they are sent - filter them out. - if len(event.Name) == 0 { - break - } - // Everything but a chmod requires rereading. - if event.Op^fsnotify.Chmod == 0 { - break - } - // Changes to a file can spawn various sequences of events with - // different combinations of operations. For all practical purposes - // this is inaccurate. - // The most reliable solution is to reload everything if anything happens. - fd.refresh(ch) + case event := <-fd.watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if len(event.Name) == 0 { + break + } + // Everything but a chmod requires rereading. + if event.Op^fsnotify.Chmod == 0 { + break + } + // Changes to a file can spawn various sequences of events with + // different combinations of operations. For all practical purposes + // this is inaccurate. + // The most reliable solution is to reload everything if anything happens. + fd.refresh(ctx, ch) - case <-ticker.C: - // Setting a new watch after an update might fail. Make sure we don't lose - // those files forever. - fd.refresh(ch) + case <-ticker.C: + // Setting a new watch after an update might fail. Make sure we don't lose + // those files forever. + fd.refresh(ctx, ch) - case err := <-fd.watcher.Errors: - if err != nil { - log.Errorf("Error on file watch: %s", err) - } + case err := <-fd.watcher.Errors: + if err != nil { + log.Errorf("Error on file watch: %s", err) } } } @@ -193,7 +185,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { +func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { t0 := time.Now() defer func() { fileSDScanDuration.Observe(time.Since(t0).Seconds()) @@ -209,7 +201,11 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { ref[p] = fd.lastRefresh[p] continue } - ch <- tgroups + select { + case ch <- tgroups: + case <-ctx.Done(): + return + } ref[p] = len(tgroups) } @@ -218,8 +214,10 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- []*config.TargetGroup{ - {Source: fileSource(f, i)}, + select { + case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}: + case <-ctx.Done(): + return } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 10ff49dba..975189067 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -106,11 +106,17 @@ retry: // not try to make sense of it all... drained := make(chan struct{}) go func() { - for tgs := range ch { - // Below we will change the file to a bad syntax. Previously extracted target - // groups must not be deleted via sending an empty target group. - if len(tgs[0].Targets) == 0 { - t.Errorf("Unexpected empty target groups received: %s", tgs) + Loop: + for { + select { + case tgs := <-ch: + // Below we will change the file to a bad syntax. Previously extracted target + // groups must not be deleted via sending an empty target group. + if len(tgs[0].Targets) == 0 { + t.Errorf("Unexpected empty target groups received: %s", tgs) + } + case <-time.After(500 * time.Millisecond): + break Loop } } close(drained) diff --git a/retrieval/discovery/gce.go b/retrieval/discovery/gce.go index d098c5608..c0f8e311d 100644 --- a/retrieval/discovery/gce.go +++ b/retrieval/discovery/gce.go @@ -108,14 +108,15 @@ func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { // Run implements the TargetProvider interface. func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - // Get an initial set right away. tg, err := gd.refresh() if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + } } ticker := time.NewTicker(gd.interval) @@ -127,8 +128,11 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := gd.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): } case <-ctx.Done(): return diff --git a/retrieval/discovery/kubernetes/kubernetes.go b/retrieval/discovery/kubernetes/kubernetes.go index b289c2a98..3d2d7a54d 100644 --- a/retrieval/discovery/kubernetes/kubernetes.go +++ b/retrieval/discovery/kubernetes/kubernetes.go @@ -107,8 +107,6 @@ const resyncPeriod = 10 * time.Minute // Run implements the TargetProvider interface. func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - rclient := k.client.Core().GetRESTClient() switch k.role { diff --git a/retrieval/discovery/marathon/marathon.go b/retrieval/discovery/marathon/marathon.go index 5e97763a1..6f9668b66 100644 --- a/retrieval/discovery/marathon/marathon.go +++ b/retrieval/discovery/marathon/marathon.go @@ -103,8 +103,6 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { // Run implements the TargetProvider interface. func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - for { select { case <-ctx.Done(): From a1eec447a4d214874b98c7214b3e7f6071d588aa Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 18 Nov 2016 11:20:28 +0100 Subject: [PATCH 2/2] discovery: fix+consolidate Zookeeper discoveries --- retrieval/discovery/nerve.go | 144 ------------ .../discovery/{serverset.go => zookeeper.go} | 206 ++++++++++-------- 2 files changed, 119 insertions(+), 231 deletions(-) delete mode 100644 retrieval/discovery/nerve.go rename retrieval/discovery/{serverset.go => zookeeper.go} (54%) diff --git a/retrieval/discovery/nerve.go b/retrieval/discovery/nerve.go deleted file mode 100644 index ed9c13acf..000000000 --- a/retrieval/discovery/nerve.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "encoding/json" - "fmt" - "net" - "sync" - "time" - - "github.com/prometheus/common/model" - "github.com/samuel/go-zookeeper/zk" - "golang.org/x/net/context" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/treecache" -) - -const ( - nerveLabelPrefix = model.MetaLabelPrefix + "nerve_" - nervePathLabel = nerveLabelPrefix + "path" - nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint" -) - -type nerveMember struct { - Host string `json:"host"` - Port int `json:"port"` - Name string `json:"name"` -} - -// NerveDiscovery retrieves target information from a Nerve server -// and updates them via watches. -type NerveDiscovery struct { - conf *config.NerveSDConfig - conn *zk.Conn - mu sync.RWMutex - sources map[string]*config.TargetGroup - sdUpdates *chan<- []*config.TargetGroup - updates chan treecache.ZookeeperTreeCacheEvent - treeCaches []*treecache.ZookeeperTreeCache -} - -// NewNerveDiscovery returns a new NerveDiscovery for the given config. -func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery { - conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout)) - conn.SetLogger(treecache.ZookeeperLogger{}) - if err != nil { - return nil - } - updates := make(chan treecache.ZookeeperTreeCacheEvent) - sd := &NerveDiscovery{ - conf: conf, - conn: conn, - updates: updates, - sources: map[string]*config.TargetGroup{}, - } - go sd.processUpdates() - for _, path := range conf.Paths { - sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) - } - return sd -} - -func (sd *NerveDiscovery) processUpdates() { - defer sd.conn.Close() - for event := range sd.updates { - tg := &config.TargetGroup{ - Source: event.Path, - } - sd.mu.Lock() - if event.Data != nil { - labelSet, err := parseNerveMember(*event.Data, event.Path) - if err == nil { - tg.Targets = []model.LabelSet{*labelSet} - sd.sources[event.Path] = tg - } else { - delete(sd.sources, event.Path) - } - } else { - delete(sd.sources, event.Path) - } - sd.mu.Unlock() - if sd.sdUpdates != nil { - *sd.sdUpdates <- []*config.TargetGroup{tg} - } - } - - if sd.sdUpdates != nil { - close(*sd.sdUpdates) - } -} - -// Run implements the TargetProvider interface. -func (sd *NerveDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // Send on everything we have seen so far. - sd.mu.Lock() - - all := make([]*config.TargetGroup, 0, len(sd.sources)) - - for _, tg := range sd.sources { - all = append(all, tg) - } - ch <- all - - // Tell processUpdates to send future updates. - sd.sdUpdates = &ch - sd.mu.Unlock() - - <-ctx.Done() - for _, tc := range sd.treeCaches { - tc.Stop() - } -} - -func parseNerveMember(data []byte, path string) (*model.LabelSet, error) { - member := nerveMember{} - err := json.Unmarshal(data, &member) - if err != nil { - return nil, fmt.Errorf("error unmarshaling nerve member %q: %s", path, err) - } - - labels := model.LabelSet{} - labels[nervePathLabel] = model.LabelValue(path) - labels[model.AddressLabel] = model.LabelValue( - net.JoinHostPort(member.Host, fmt.Sprintf("%d", member.Port))) - - labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host) - labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port)) - labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name) - - return &labels, nil -} diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/zookeeper.go similarity index 54% rename from retrieval/discovery/serverset.go rename to retrieval/discovery/zookeeper.go index 1b75c70da..31f78845f 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/zookeeper.go @@ -18,7 +18,6 @@ import ( "fmt" "net" "strconv" - "sync" "time" "github.com/prometheus/common/model" @@ -30,6 +29,92 @@ import ( "github.com/prometheus/prometheus/util/treecache" ) +type ZookeeperDiscovery struct { + conn *zk.Conn + + sources map[string]*config.TargetGroup + + updates chan treecache.ZookeeperTreeCacheEvent + treeCaches []*treecache.ZookeeperTreeCache + + parse func(data []byte, path string) (model.LabelSet, error) +} + +// NewNerveDiscovery returns a new NerveDiscovery for the given config. +func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery { + return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) +} + +// NewServersetDiscovery returns a new ServersetDiscovery for the given config. +func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery { + return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) +} + +// NewZookeeperDiscovery returns a new discovery along Zookeeper parses with +// the given parse function. +func NewZookeeperDiscovery( + srvs []string, + timeout time.Duration, + paths []string, + pf func(data []byte, path string) (model.LabelSet, error), +) *ZookeeperDiscovery { + conn, _, err := zk.Connect(srvs, time.Duration(timeout)) + conn.SetLogger(treecache.ZookeeperLogger{}) + if err != nil { + return nil + } + updates := make(chan treecache.ZookeeperTreeCacheEvent) + sd := &ZookeeperDiscovery{ + conn: conn, + updates: updates, + sources: map[string]*config.TargetGroup{}, + parse: pf, + } + for _, path := range paths { + sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) + } + return sd +} + +// Run implements the TargetProvider interface. +func (sd *ZookeeperDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer func() { + for _, tc := range sd.treeCaches { + tc.Stop() + } + // Drain event channel in case the treecache leaks goroutines otherwise. + for range sd.updates { + } + sd.conn.Close() + }() + + for { + select { + case <-ctx.Done(): + case event := <-sd.updates: + tg := &config.TargetGroup{ + Source: event.Path, + } + if event.Data != nil { + labelSet, err := sd.parse(*event.Data, event.Path) + if err == nil { + tg.Targets = []model.LabelSet{labelSet} + sd.sources[event.Path] = tg + } else { + delete(sd.sources, event.Path) + } + } else { + delete(sd.sources, event.Path) + } + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{tg}: + } + } + } +} + const ( serversetLabelPrefix = model.MetaLabelPrefix + "serverset_" serversetStatusLabel = serversetLabelPrefix + "status" @@ -50,91 +135,7 @@ type serversetEndpoint struct { Port int } -// ServersetDiscovery retrieves target information from a Serverset server -// and updates them via watches. -type ServersetDiscovery struct { - conf *config.ServersetSDConfig - conn *zk.Conn - mu sync.RWMutex - sources map[string]*config.TargetGroup - sdUpdates *chan<- []*config.TargetGroup - updates chan treecache.ZookeeperTreeCacheEvent - treeCaches []*treecache.ZookeeperTreeCache -} - -// NewServersetDiscovery returns a new ServersetDiscovery for the given config. -func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { - conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout)) - conn.SetLogger(treecache.ZookeeperLogger{}) - if err != nil { - return nil - } - updates := make(chan treecache.ZookeeperTreeCacheEvent) - sd := &ServersetDiscovery{ - conf: conf, - conn: conn, - updates: updates, - sources: map[string]*config.TargetGroup{}, - } - go sd.processUpdates() - for _, path := range conf.Paths { - sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) - } - return sd -} - -func (sd *ServersetDiscovery) processUpdates() { - defer sd.conn.Close() - for event := range sd.updates { - tg := &config.TargetGroup{ - Source: event.Path, - } - sd.mu.Lock() - if event.Data != nil { - labelSet, err := parseServersetMember(*event.Data, event.Path) - if err == nil { - tg.Targets = []model.LabelSet{*labelSet} - sd.sources[event.Path] = tg - } else { - delete(sd.sources, event.Path) - } - } else { - delete(sd.sources, event.Path) - } - sd.mu.Unlock() - if sd.sdUpdates != nil { - *sd.sdUpdates <- []*config.TargetGroup{tg} - } - } - - if sd.sdUpdates != nil { - close(*sd.sdUpdates) - } -} - -// Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // Send on everything we have seen so far. - sd.mu.Lock() - - all := make([]*config.TargetGroup, 0, len(sd.sources)) - - for _, tg := range sd.sources { - all = append(all, tg) - } - ch <- all - - // Tell processUpdates to send future updates. - sd.sdUpdates = &ch - sd.mu.Unlock() - - <-ctx.Done() - for _, tc := range sd.treeCaches { - tc.Stop() - } -} - -func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { +func parseServersetMember(data []byte, path string) (model.LabelSet, error) { member := serversetMember{} if err := json.Unmarshal(data, &member); err != nil { @@ -161,5 +162,36 @@ func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { labels[serversetStatusLabel] = model.LabelValue(member.Status) labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard)) - return &labels, nil + return labels, nil +} + +const ( + nerveLabelPrefix = model.MetaLabelPrefix + "nerve_" + nervePathLabel = nerveLabelPrefix + "path" + nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint" +) + +type nerveMember struct { + Host string `json:"host"` + Port int `json:"port"` + Name string `json:"name"` +} + +func parseNerveMember(data []byte, path string) (model.LabelSet, error) { + member := nerveMember{} + err := json.Unmarshal(data, &member) + if err != nil { + return nil, fmt.Errorf("error unmarshaling nerve member %q: %s", path, err) + } + + labels := model.LabelSet{} + labels[nervePathLabel] = model.LabelValue(path) + labels[model.AddressLabel] = model.LabelValue( + net.JoinHostPort(member.Host, fmt.Sprintf("%d", member.Port))) + + labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host) + labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port)) + labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name) + + return labels, nil }