From c586c15ae6a0fdb4d02abb9b850bcfedb44644e9 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 15 Nov 2023 11:41:12 +0100 Subject: [PATCH] fix(discovery): make discovery manager notify consumers of dropped targets for still defined jobs scrape/manager_test.go: add a test to check that the manager gets notified for targets that got dropped by discovery to reproduce: https://github.com/prometheus/prometheus/issues/12858#issuecomment-1732318102 Signed-off-by: machine424 --- discovery/manager.go | 41 ++-- discovery/manager_test.go | 5 +- scrape/manager.go | 2 +- scrape/manager_test.go | 413 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 446 insertions(+), 15 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 48bea85bb..b7ebdb7e0 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -212,9 +212,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { m.metrics.FailedConfigs.Set(float64(failedCount)) var ( - wg sync.WaitGroup - // keep shows if we keep any providers after reload. - keep bool + wg sync.WaitGroup newProviders []*Provider ) for _, prov := range m.providers { @@ -228,13 +226,12 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { continue } newProviders = append(newProviders, prov) - // refTargets keeps reference targets used to populate new subs' targets + // refTargets keeps reference targets used to populate new subs' targets as they should be the same. var refTargets map[string]*targetgroup.Group prov.mu.Lock() m.targetsMtx.Lock() for s := range prov.subs { - keep = true refTargets = m.targets[poolKey{s, prov.name}] // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { @@ -267,7 +264,9 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // While startProvider does pull the trigger, it may take some time to do so, therefore // we pull the trigger as soon as possible so that downstream managers can populate their state. // See https://github.com/prometheus/prometheus/pull/8639 for details. - if keep { + // This also helps making the downstream managers drop stale targets as soon as possible. + // See https://github.com/prometheus/prometheus/pull/13147 for details. + if len(m.providers) > 0 { select { case m.triggerSend <- struct{}{}: default: @@ -288,7 +287,9 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D name: {}, }, } + m.mtx.Lock() m.providers = append(m.providers, p) + m.mtx.Unlock() m.startProvider(ctx, p) } @@ -403,19 +404,33 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSets := map[string][]*targetgroup.Group{} n := map[string]int{} + m.mtx.RLock() m.targetsMtx.Lock() - defer m.targetsMtx.Unlock() - for pkey, tsets := range m.targets { - for _, tg := range tsets { - // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to signal that it needs to stop all scrape loops for this target set. - tSets[pkey.setName] = append(tSets[pkey.setName], tg) - n[pkey.setName] += len(tg.Targets) + for _, p := range m.providers { + p.mu.RLock() + for s := range p.subs { + // Send empty lists for subs without any targets to make sure old stale targets are dropped by consumers. + // See: https://github.com/prometheus/prometheus/issues/12858 for details. + if _, ok := tSets[s]; !ok { + tSets[s] = []*targetgroup.Group{} + n[s] = 0 + } + if tsets, ok := m.targets[poolKey{s, p.name}]; ok { + for _, tg := range tsets { + tSets[s] = append(tSets[s], tg) + n[s] += len(tg.Targets) + } + } } + p.mu.RUnlock() } + m.targetsMtx.Unlock() + m.mtx.RUnlock() + for setName, v := range n { m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v)) } + return tSets } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index be07edbdb..707c3931d 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -939,11 +939,13 @@ func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) { discoveryManager.ApplyConfig(c) // Original targets should be present as soon as possible. + // An empty list should be sent for prometheus2 to drop any stale targets syncedTargets = <-discoveryManager.SyncCh() mu.Unlock() - require.Len(t, syncedTargets, 1) + require.Len(t, syncedTargets, 2) verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) require.Len(t, syncedTargets["prometheus"], 1) + require.Empty(t, syncedTargets["prometheus2"]) // prometheus2 configs should be ready on second sync. syncedTargets = <-discoveryManager.SyncCh() @@ -1275,6 +1277,7 @@ func TestCoordinationWithReceiver(t *testing.T) { Targets: []model.LabelSet{{"__instance__": "1"}}, }, }, + "mock1": {}, }, }, { diff --git a/scrape/manager.go b/scrape/manager.go index 6d4e8707b..e3dba5f0e 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -142,7 +142,7 @@ func (m *Manager) UnregisterMetrics() { func (m *Manager) reloader() { reloadIntervalDuration := m.opts.DiscoveryReloadInterval - if reloadIntervalDuration < model.Duration(5*time.Second) { + if reloadIntervalDuration == model.Duration(0) { reloadIntervalDuration = model.Duration(5 * time.Second) } diff --git a/scrape/manager_test.go b/scrape/manager_test.go index c8d9bd698..a2a3bba6f 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -20,6 +20,7 @@ import ( "net/http/httptest" "net/url" "os" + "sort" "strconv" "sync" "testing" @@ -36,6 +37,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + _ "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -869,3 +871,414 @@ func TestUnregisterMetrics(t *testing.T) { manager.UnregisterMetrics() } } + +func applyConfig( + t *testing.T, + config string, + scrapeManager *Manager, + discoveryManager *discovery.Manager, +) { + t.Helper() + + cfg := loadConfiguration(t, config) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + c := make(map[string]discovery.Configs) + scfgs, err := cfg.GetScrapeConfigs() + require.NoError(t, err) + for _, v := range scfgs { + c[v.JobName] = v.ServiceDiscoveryConfigs + } + require.NoError(t, discoveryManager.ApplyConfig(c)) +} + +func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) { + t.Helper() + + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + discoveryManager := discovery.NewManager( + ctx, + log.NewNopLogger(), + reg, + sdMetrics, + discovery.Updatert(100*time.Millisecond), + ) + scrapeManager, err := NewManager( + &Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)}, + nil, + nopAppendable{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + go discoveryManager.Run() + go scrapeManager.Run(discoveryManager.SyncCh()) + return discoveryManager, scrapeManager +} + +func writeIntoFile(t *testing.T, content, filePattern string) *os.File { + t.Helper() + + file, err := os.CreateTemp("", filePattern) + require.NoError(t, err) + _, err = file.WriteString(content) + require.NoError(t, err) + return file +} + +func requireTargets( + t *testing.T, + scrapeManager *Manager, + jobName string, + waitToAppear bool, + expectedTargets []string, +) { + t.Helper() + + require.Eventually(t, func() bool { + targets, ok := scrapeManager.TargetsActive()[jobName] + if !ok { + if waitToAppear { + return false + } + t.Fatalf("job %s shouldn't be dropped", jobName) + } + if expectedTargets == nil { + return targets == nil + } + if len(targets) != len(expectedTargets) { + return false + } + sTargets := []string{} + for _, t := range targets { + sTargets = append(sTargets, t.String()) + } + sort.Strings(expectedTargets) + sort.Strings(sTargets) + for i, t := range sTargets { + if t != expectedTargets[i] { + return false + } + } + return true + }, 1*time.Second, 100*time.Millisecond) +} + +// TestTargetDisappearsAfterProviderRemoved makes sure that when a provider is dropped, (only) its targets are dropped. +func TestTargetDisappearsAfterProviderRemoved(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + myJob := "my-job" + myJobSDTargetURL := "my:9876" + myJobStaticTargetURL := "my:5432" + + sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL) + sDFile := writeIntoFile(t, sdFileContent, "*targets.json") + + baseConfig := ` +scrape_configs: +- job_name: %s + static_configs: + - targets: ['%s'] + file_sd_configs: + - files: ['%s'] +` + + discoveryManager, scrapeManager := runManagers(t, ctx) + defer scrapeManager.Stop() + + applyConfig( + t, + fmt.Sprintf( + baseConfig, + myJob, + myJobStaticTargetURL, + sDFile.Name(), + ), + scrapeManager, + discoveryManager, + ) + // Make sure the jobs targets are taken into account + requireTargets( + t, + scrapeManager, + myJob, + true, + []string{ + fmt.Sprintf("http://%s/metrics", myJobSDTargetURL), + fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), + }, + ) + + // Apply a new config where a provider is removed + baseConfig = ` +scrape_configs: +- job_name: %s + static_configs: + - targets: ['%s'] +` + applyConfig( + t, + fmt.Sprintf( + baseConfig, + myJob, + myJobStaticTargetURL, + ), + scrapeManager, + discoveryManager, + ) + // Make sure the corresponding target was dropped + requireTargets( + t, + scrapeManager, + myJob, + false, + []string{ + fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), + }, + ) + + // Apply a new config with no providers + baseConfig = ` +scrape_configs: +- job_name: %s +` + applyConfig( + t, + fmt.Sprintf( + baseConfig, + myJob, + ), + scrapeManager, + discoveryManager, + ) + // Make sure the corresponding target was dropped + requireTargets( + t, + scrapeManager, + myJob, + false, + nil, + ) +} + +// TestOnlyProviderStaleTargetsAreDropped makes sure that when a job has only one provider with multiple targets +// and when the provider can no longer discover some of those targets, only those stale targets are dropped. +func TestOnlyProviderStaleTargetsAreDropped(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + jobName := "my-job" + jobTarget1URL := "foo:9876" + jobTarget2URL := "foo:5432" + + sdFile1Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget1URL) + sdFile2Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget2URL) + sDFile1 := writeIntoFile(t, sdFile1Content, "*targets.json") + sDFile2 := writeIntoFile(t, sdFile2Content, "*targets.json") + + baseConfig := ` +scrape_configs: +- job_name: %s + file_sd_configs: + - files: ['%s', '%s'] +` + discoveryManager, scrapeManager := runManagers(t, ctx) + defer scrapeManager.Stop() + + applyConfig( + t, + fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), sDFile2.Name()), + scrapeManager, + discoveryManager, + ) + + // Make sure the job's targets are taken into account + requireTargets( + t, + scrapeManager, + jobName, + true, + []string{ + fmt.Sprintf("http://%s/metrics", jobTarget1URL), + fmt.Sprintf("http://%s/metrics", jobTarget2URL), + }, + ) + + // Apply the same config for the same job but with a non existing file to make the provider + // unable to discover some targets + applyConfig( + t, + fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), "/idontexistdoi.json"), + scrapeManager, + discoveryManager, + ) + + // The old target should get dropped + requireTargets( + t, + scrapeManager, + jobName, + false, + []string{fmt.Sprintf("http://%s/metrics", jobTarget1URL)}, + ) +} + +// TestProviderStaleTargetsAreDropped makes sure that when a job has only one provider and when that provider +// should no longer discover targets, the targets of that provider are dropped. +// See: https://github.com/prometheus/prometheus/issues/12858 +func TestProviderStaleTargetsAreDropped(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + jobName := "my-job" + jobTargetURL := "foo:9876" + + sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTargetURL) + sDFile := writeIntoFile(t, sdFileContent, "*targets.json") + + baseConfig := ` +scrape_configs: +- job_name: %s + file_sd_configs: + - files: ['%s'] +` + discoveryManager, scrapeManager := runManagers(t, ctx) + defer scrapeManager.Stop() + + applyConfig( + t, + fmt.Sprintf(baseConfig, jobName, sDFile.Name()), + scrapeManager, + discoveryManager, + ) + + // Make sure the job's targets are taken into account + requireTargets( + t, + scrapeManager, + jobName, + true, + []string{ + fmt.Sprintf("http://%s/metrics", jobTargetURL), + }, + ) + + // Apply the same config for the same job but with a non existing file to make the provider + // unable to discover some targets + applyConfig( + t, + fmt.Sprintf(baseConfig, jobName, "/idontexistdoi.json"), + scrapeManager, + discoveryManager, + ) + + // The old target should get dropped + requireTargets( + t, + scrapeManager, + jobName, + false, + nil, + ) +} + +// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when aone of them should no, +// longer discover targets, only the stale targets of that provier are dropped. +func TestOnlyStaleTargetsAreDropped(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + myJob := "my-job" + myJobSDTargetURL := "my:9876" + myJobStaticTargetURL := "my:5432" + otherJob := "other-job" + otherJobTargetURL := "other:1234" + + sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL) + sDFile := writeIntoFile(t, sdFileContent, "*targets.json") + + baseConfig := ` +scrape_configs: +- job_name: %s + static_configs: + - targets: ['%s'] + file_sd_configs: + - files: ['%s'] +- job_name: %s + static_configs: + - targets: ['%s'] +` + + discoveryManager, scrapeManager := runManagers(t, ctx) + defer scrapeManager.Stop() + + // Apply the initial config with an existing file + applyConfig( + t, + fmt.Sprintf( + baseConfig, + myJob, + myJobStaticTargetURL, + sDFile.Name(), + otherJob, + otherJobTargetURL, + ), + scrapeManager, + discoveryManager, + ) + // Make sure the jobs targets are taken into account + requireTargets( + t, + scrapeManager, + myJob, + true, + []string{ + fmt.Sprintf("http://%s/metrics", myJobSDTargetURL), + fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), + }, + ) + requireTargets( + t, + scrapeManager, + otherJob, + true, + []string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)}, + ) + + // Apply the same config with a non existing file for myJob + applyConfig( + t, + fmt.Sprintf( + baseConfig, + myJob, + myJobStaticTargetURL, + "/idontexistdoi.json", + otherJob, + otherJobTargetURL, + ), + scrapeManager, + discoveryManager, + ) + + // Only the SD target should get dropped for myJob + requireTargets( + t, + scrapeManager, + myJob, + false, + []string{ + fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), + }, + ) + // The otherJob should keep its target + requireTargets( + t, + scrapeManager, + otherJob, + false, + []string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)}, + ) +}