From d88aea7e6fdfd7a6ecf1e9a7ff2e6c0d964788b5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 8 Oct 2015 18:06:58 +0200 Subject: [PATCH] Fix SD mechanism source prefix handling. The prefixed target provider changed a pointerized target group that was reused in the wrapped target provider, causing an ever-increasing chain of source prefixes in target groups from the Consul target provider. We now make this bug generally impossible by switching the target group channel from pointer to value type and thus ensuring that target groups are copied before being passed on to other parts of the system. I tried to not let the depointerization leak too far outside of the channel handling (both upstream and downstream) because I tried that initially and caused some nasty bugs, which I want to minimize. Fixes https://github.com/prometheus/prometheus/issues/1083 --- retrieval/discovery/consul.go | 13 ++++---- retrieval/discovery/dns.go | 8 ++--- retrieval/discovery/ec2.go | 6 ++-- retrieval/discovery/file.go | 8 ++--- retrieval/discovery/file_test.go | 2 +- retrieval/discovery/kubernetes/discovery.go | 36 ++++++++++++++------- retrieval/discovery/marathon.go | 8 ++--- retrieval/discovery/marathon_test.go | 4 +-- retrieval/discovery/serverset.go | 8 ++--- retrieval/helpers_test.go | 4 +-- retrieval/targetmanager.go | 27 ++++++---------- retrieval/targetmanager_test.go | 6 ++-- 12 files changed, 67 insertions(+), 63 deletions(-) diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 3e2f856f0..172c760a3 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -66,7 +66,7 @@ type ConsulDiscovery struct { // consulService contains data belonging to the same service. type consulService struct { name string - tgroup *config.TargetGroup + tgroup config.TargetGroup lastIndex uint64 removed bool running bool @@ -143,7 +143,7 @@ func (cd *ConsulDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) defer cd.stop() @@ -159,7 +159,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct close(srv.done) // Send clearing update. - ch <- &config.TargetGroup{Source: srv.name} + ch <- config.TargetGroup{Source: srv.name} break } // Launch watcher for the service. @@ -219,9 +219,8 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch srv, ok := cd.services[name] if !ok { srv = &consulService{ - name: name, - tgroup: &config.TargetGroup{}, - done: make(chan struct{}), + name: name, + done: make(chan struct{}), } srv.tgroup.Source = name cd.services[name] = srv @@ -246,7 +245,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch // watchService retrieves updates about srv from Consul's service endpoint. // On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { +func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { catalog := cd.client.Catalog() for { nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 7e4cb235b..40a8841e1 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -91,7 +91,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery { } // Run implements the TargetProvider interface. -func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) ticker := time.NewTicker(dd.interval) @@ -119,7 +119,7 @@ func (dd *DNSDiscovery) Sources() []string { return srcs } -func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { +func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { var wg sync.WaitGroup wg.Add(len(dd.names)) for _, name := range dd.names { @@ -133,7 +133,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { wg.Wait() } -func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error { +func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { response, err := lookupAll(name, dd.qtype) dnsSDLookupsCount.Inc() if err != nil { @@ -141,7 +141,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro return err } - tg := &config.TargetGroup{} + var tg config.TargetGroup for _, record := range response.Answer { target := model.LabelValue("") switch addr := record.(type) { diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 795355509..193a68ab7 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -62,7 +62,7 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { } // Run implements the TargetProvider interface. -func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) ticker := time.NewTicker(ed.interval) @@ -73,7 +73,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- tg + ch <- *tg } for { @@ -83,7 +83,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- tg + ch <- *tg } case <-done: return diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index 44dc240aa..eb0411b30 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -103,7 +103,7 @@ func (fd *FileDiscovery) watchFiles() { } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) defer fd.stop() @@ -188,7 +188,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { +func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { ref := map[string]int{} for _, p := range fd.listFiles() { tgroups, err := readFile(p) @@ -199,7 +199,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { continue } for _, tg := range tgroups { - ch <- tg + ch <- *tg } ref[p] = len(tgroups) } @@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- &config.TargetGroup{Source: fileSource(f, i)} + ch <- config.TargetGroup{Source: fileSource(f, i)} } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 0335013b1..74270ad02 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -26,7 +26,7 @@ func testFileSD(t *testing.T, ext string) { var ( fsd = NewFileDiscovery(&conf) - ch = make(chan *config.TargetGroup) + ch = make(chan config.TargetGroup) done = make(chan struct{}) ) go fsd.Run(ch, done) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index dce1b9ccd..5438f6f21 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -173,25 +173,35 @@ func (kd *Discovery) Sources() []string { } // Run implements the TargetProvider interface. -func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) - select { - case ch <- kd.updateMastersTargetGroup(): - case <-done: - return + if tg := kd.updateMastersTargetGroup(); tg != nil { + select { + case ch <- *tg: + case <-done: + return + } } - select { - case ch <- kd.updateNodesTargetGroup(): - case <-done: - return + if tg := kd.updateNodesTargetGroup(); tg != nil { + select { + case ch <- *tg: + case <-done: + return + } } for _, ns := range kd.services { for _, service := range ns { + tg := kd.addService(service) + + if tg == nil { + continue + } + select { - case ch <- kd.addService(service): + case ch <- *tg: case <-done: return } @@ -223,8 +233,12 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { } } + if tg == nil { + continue + } + select { - case ch <- tg: + case ch <- *tg: case <-done: return } diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index ca89b91ea..181ea440f 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -53,7 +53,7 @@ func (md *MarathonDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for { @@ -69,7 +69,7 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan stru } } -func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { +func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { targetMap, err := md.fetchTargetGroups() if err != nil { return err @@ -77,7 +77,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error // Update services which are still present for _, tg := range targetMap { - ch <- tg + ch <- *tg } // Remove services which did disappear @@ -85,7 +85,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error _, ok := targetMap[source] if !ok { log.Debugf("Removing group for %s", source) - ch <- &config.TargetGroup{Source: source} + ch <- config.TargetGroup{Source: source} } } diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index 6dd731e98..f54d82807 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -26,8 +26,8 @@ import ( var marathonValidLabel = map[string]string{"prometheus": "yes"} -func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) { - ch := make(chan *config.TargetGroup) +func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { + ch := make(chan config.TargetGroup) md := NewMarathonDiscovery(&config.MarathonSDConfig{ Servers: []string{"http://localhost:8080"}, }) diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 0e1265616..21bdb6b04 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -67,7 +67,7 @@ type ServersetDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- *config.TargetGroup + sdUpdates *chan<- config.TargetGroup updates chan zookeeperTreeCacheEvent treeCaches []*zookeeperTreeCache } @@ -124,7 +124,7 @@ func (sd *ServersetDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- tg + *sd.sdUpdates <- *tg } } @@ -134,11 +134,11 @@ func (sd *ServersetDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { // Send on everything we have seen so far. sd.mu.Lock() for _, targetGroup := range sd.sources { - ch <- targetGroup + ch <- *targetGroup } // Tell processUpdates to send future updates. sd.sdUpdates = &ch diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 1e715d774..880e6230c 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -52,12 +52,12 @@ type fakeTargetProvider struct { update chan *config.TargetGroup } -func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (tp *fakeTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for { select { case tg := <-tp.update: - ch <- tg + ch <- *tg case <-done: return } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 3f3a4377b..f301b9a1b 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -43,7 +43,7 @@ type TargetProvider interface { // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. - Run(up chan<- *config.TargetGroup, done <-chan struct{}) + Run(up chan<- config.TargetGroup, done <-chan struct{}) } // TargetManager maintains a set of targets, starts and stops their scraping and @@ -105,7 +105,7 @@ func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGr // targetGroupUpdate is a potentially changed/new target group // for the given scrape configuration. type targetGroupUpdate struct { - tg *config.TargetGroup + tg config.TargetGroup scfg *config.ScrapeConfig } @@ -126,9 +126,9 @@ func (tm *TargetManager) Run() { sources[src] = struct{}{} } - tgc := make(chan *config.TargetGroup) + tgc := make(chan config.TargetGroup) // Run the target provider after cleanup of the stale targets is done. - defer func(prov TargetProvider, tgc chan<- *config.TargetGroup, done <-chan struct{}) { + defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { go prov.Run(tgc, done) }(prov, tgc, tm.done) @@ -140,9 +140,6 @@ func (tm *TargetManager) Run() { for { select { case tg := <-tgc: - if tg == nil { - break - } tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} case <-done: return @@ -179,12 +176,9 @@ func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan if !ok { return } - if update.tg == nil { - break - } log.Debugf("Received potential update for target group %q", update.tg.Source) - if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil { + if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { log.Errorf("Error updating targets: %s", err) } case <-done: @@ -382,10 +376,10 @@ func (tp *prefixedTargetProvider) Sources() []string { return srcs } -func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) - ch2 := make(chan *config.TargetGroup) + ch2 := make(chan config.TargetGroup) go tp.TargetProvider.Run(ch2, done) for { @@ -393,9 +387,6 @@ func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan case <-done: return case tg := <-ch2: - if tg == nil { - break - } tg.Source = tp.prefix(tg.Source) ch <- tg } @@ -537,14 +528,14 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { } // Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for _, tg := range sd.TargetGroups { select { case <-done: return - case ch <- tg: + case ch <- *tg: } } <-done diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index aa0cb188a..3f605f769 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -52,7 +52,7 @@ func TestPrefixedTargetProvider(t *testing.T) { t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) } - ch := make(chan *config.TargetGroup) + ch := make(chan config.TargetGroup) done := make(chan struct{}) defer close(done) @@ -64,10 +64,10 @@ func TestPrefixedTargetProvider(t *testing.T) { expGroup2.Source = "job-x:static:123:1" // The static target provider sends on the channel once per target group. - if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) { + if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) { t.Fatalf("expected target group %v, got %v", expGroup1, tg) } - if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) { + if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) { t.Fatalf("expected target group %v, got %v", expGroup2, tg) } }