fix(discovery): make discovery manager notify consumers of dropped targets for still defined jobs

scrape/manager_test.go: add a test to check that the manager gets notified
for targets that got dropped by discovery to reproduce: https://github.com/prometheus/prometheus/issues/12858#issuecomment-1732318102

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/14756/head
machine424 1 year ago committed by Ayoub Mrini
parent 849215d90c
commit c586c15ae6

@ -212,9 +212,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.metrics.FailedConfigs.Set(float64(failedCount))
var (
wg sync.WaitGroup
// keep shows if we keep any providers after reload.
keep bool
wg sync.WaitGroup
newProviders []*Provider
)
for _, prov := range m.providers {
@ -228,13 +226,12 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
continue
}
newProviders = append(newProviders, prov)
// refTargets keeps reference targets used to populate new subs' targets
// refTargets keeps reference targets used to populate new subs' targets as they should be the same.
var refTargets map[string]*targetgroup.Group
prov.mu.Lock()
m.targetsMtx.Lock()
for s := range prov.subs {
keep = true
refTargets = m.targets[poolKey{s, prov.name}]
// Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok {
@ -267,7 +264,9 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// While startProvider does pull the trigger, it may take some time to do so, therefore
// we pull the trigger as soon as possible so that downstream managers can populate their state.
// See https://github.com/prometheus/prometheus/pull/8639 for details.
if keep {
// This also helps making the downstream managers drop stale targets as soon as possible.
// See https://github.com/prometheus/prometheus/pull/13147 for details.
if len(m.providers) > 0 {
select {
case m.triggerSend <- struct{}{}:
default:
@ -288,7 +287,9 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D
name: {},
},
}
m.mtx.Lock()
m.providers = append(m.providers, p)
m.mtx.Unlock()
m.startProvider(ctx, p)
}
@ -403,19 +404,33 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group {
tSets := map[string][]*targetgroup.Group{}
n := map[string]int{}
m.mtx.RLock()
m.targetsMtx.Lock()
defer m.targetsMtx.Unlock()
for pkey, tsets := range m.targets {
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSets[pkey.setName] = append(tSets[pkey.setName], tg)
n[pkey.setName] += len(tg.Targets)
for _, p := range m.providers {
p.mu.RLock()
for s := range p.subs {
// Send empty lists for subs without any targets to make sure old stale targets are dropped by consumers.
// See: https://github.com/prometheus/prometheus/issues/12858 for details.
if _, ok := tSets[s]; !ok {
tSets[s] = []*targetgroup.Group{}
n[s] = 0
}
if tsets, ok := m.targets[poolKey{s, p.name}]; ok {
for _, tg := range tsets {
tSets[s] = append(tSets[s], tg)
n[s] += len(tg.Targets)
}
}
}
p.mu.RUnlock()
}
m.targetsMtx.Unlock()
m.mtx.RUnlock()
for setName, v := range n {
m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v))
}
return tSets
}

@ -939,11 +939,13 @@ func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) {
discoveryManager.ApplyConfig(c)
// Original targets should be present as soon as possible.
// An empty list should be sent for prometheus2 to drop any stale targets
syncedTargets = <-discoveryManager.SyncCh()
mu.Unlock()
require.Len(t, syncedTargets, 1)
require.Len(t, syncedTargets, 2)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Len(t, syncedTargets["prometheus"], 1)
require.Empty(t, syncedTargets["prometheus2"])
// prometheus2 configs should be ready on second sync.
syncedTargets = <-discoveryManager.SyncCh()
@ -1275,6 +1277,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
Targets: []model.LabelSet{{"__instance__": "1"}},
},
},
"mock1": {},
},
},
{

@ -142,7 +142,7 @@ func (m *Manager) UnregisterMetrics() {
func (m *Manager) reloader() {
reloadIntervalDuration := m.opts.DiscoveryReloadInterval
if reloadIntervalDuration < model.Duration(5*time.Second) {
if reloadIntervalDuration == model.Duration(0) {
reloadIntervalDuration = model.Duration(5 * time.Second)
}

@ -20,6 +20,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"sort"
"strconv"
"sync"
"testing"
@ -36,6 +37,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
@ -869,3 +871,414 @@ func TestUnregisterMetrics(t *testing.T) {
manager.UnregisterMetrics()
}
}
func applyConfig(
t *testing.T,
config string,
scrapeManager *Manager,
discoveryManager *discovery.Manager,
) {
t.Helper()
cfg := loadConfiguration(t, config)
require.NoError(t, scrapeManager.ApplyConfig(cfg))
c := make(map[string]discovery.Configs)
scfgs, err := cfg.GetScrapeConfigs()
require.NoError(t, err)
for _, v := range scfgs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
require.NoError(t, discoveryManager.ApplyConfig(c))
}
func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) {
t.Helper()
reg := prometheus.NewRegistry()
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
require.NoError(t, err)
discoveryManager := discovery.NewManager(
ctx,
log.NewNopLogger(),
reg,
sdMetrics,
discovery.Updatert(100*time.Millisecond),
)
scrapeManager, err := NewManager(
&Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)},
nil,
nopAppendable{},
prometheus.NewRegistry(),
)
require.NoError(t, err)
go discoveryManager.Run()
go scrapeManager.Run(discoveryManager.SyncCh())
return discoveryManager, scrapeManager
}
func writeIntoFile(t *testing.T, content, filePattern string) *os.File {
t.Helper()
file, err := os.CreateTemp("", filePattern)
require.NoError(t, err)
_, err = file.WriteString(content)
require.NoError(t, err)
return file
}
func requireTargets(
t *testing.T,
scrapeManager *Manager,
jobName string,
waitToAppear bool,
expectedTargets []string,
) {
t.Helper()
require.Eventually(t, func() bool {
targets, ok := scrapeManager.TargetsActive()[jobName]
if !ok {
if waitToAppear {
return false
}
t.Fatalf("job %s shouldn't be dropped", jobName)
}
if expectedTargets == nil {
return targets == nil
}
if len(targets) != len(expectedTargets) {
return false
}
sTargets := []string{}
for _, t := range targets {
sTargets = append(sTargets, t.String())
}
sort.Strings(expectedTargets)
sort.Strings(sTargets)
for i, t := range sTargets {
if t != expectedTargets[i] {
return false
}
}
return true
}, 1*time.Second, 100*time.Millisecond)
}
// TestTargetDisappearsAfterProviderRemoved makes sure that when a provider is dropped, (only) its targets are dropped.
func TestTargetDisappearsAfterProviderRemoved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
myJob := "my-job"
myJobSDTargetURL := "my:9876"
myJobStaticTargetURL := "my:5432"
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL)
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
baseConfig := `
scrape_configs:
- job_name: %s
static_configs:
- targets: ['%s']
file_sd_configs:
- files: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx)
defer scrapeManager.Stop()
applyConfig(
t,
fmt.Sprintf(
baseConfig,
myJob,
myJobStaticTargetURL,
sDFile.Name(),
),
scrapeManager,
discoveryManager,
)
// Make sure the jobs targets are taken into account
requireTargets(
t,
scrapeManager,
myJob,
true,
[]string{
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL),
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
},
)
// Apply a new config where a provider is removed
baseConfig = `
scrape_configs:
- job_name: %s
static_configs:
- targets: ['%s']
`
applyConfig(
t,
fmt.Sprintf(
baseConfig,
myJob,
myJobStaticTargetURL,
),
scrapeManager,
discoveryManager,
)
// Make sure the corresponding target was dropped
requireTargets(
t,
scrapeManager,
myJob,
false,
[]string{
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
},
)
// Apply a new config with no providers
baseConfig = `
scrape_configs:
- job_name: %s
`
applyConfig(
t,
fmt.Sprintf(
baseConfig,
myJob,
),
scrapeManager,
discoveryManager,
)
// Make sure the corresponding target was dropped
requireTargets(
t,
scrapeManager,
myJob,
false,
nil,
)
}
// TestOnlyProviderStaleTargetsAreDropped makes sure that when a job has only one provider with multiple targets
// and when the provider can no longer discover some of those targets, only those stale targets are dropped.
func TestOnlyProviderStaleTargetsAreDropped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobName := "my-job"
jobTarget1URL := "foo:9876"
jobTarget2URL := "foo:5432"
sdFile1Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget1URL)
sdFile2Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget2URL)
sDFile1 := writeIntoFile(t, sdFile1Content, "*targets.json")
sDFile2 := writeIntoFile(t, sdFile2Content, "*targets.json")
baseConfig := `
scrape_configs:
- job_name: %s
file_sd_configs:
- files: ['%s', '%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx)
defer scrapeManager.Stop()
applyConfig(
t,
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), sDFile2.Name()),
scrapeManager,
discoveryManager,
)
// Make sure the job's targets are taken into account
requireTargets(
t,
scrapeManager,
jobName,
true,
[]string{
fmt.Sprintf("http://%s/metrics", jobTarget1URL),
fmt.Sprintf("http://%s/metrics", jobTarget2URL),
},
)
// Apply the same config for the same job but with a non existing file to make the provider
// unable to discover some targets
applyConfig(
t,
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), "/idontexistdoi.json"),
scrapeManager,
discoveryManager,
)
// The old target should get dropped
requireTargets(
t,
scrapeManager,
jobName,
false,
[]string{fmt.Sprintf("http://%s/metrics", jobTarget1URL)},
)
}
// TestProviderStaleTargetsAreDropped makes sure that when a job has only one provider and when that provider
// should no longer discover targets, the targets of that provider are dropped.
// See: https://github.com/prometheus/prometheus/issues/12858
func TestProviderStaleTargetsAreDropped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobName := "my-job"
jobTargetURL := "foo:9876"
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTargetURL)
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
baseConfig := `
scrape_configs:
- job_name: %s
file_sd_configs:
- files: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx)
defer scrapeManager.Stop()
applyConfig(
t,
fmt.Sprintf(baseConfig, jobName, sDFile.Name()),
scrapeManager,
discoveryManager,
)
// Make sure the job's targets are taken into account
requireTargets(
t,
scrapeManager,
jobName,
true,
[]string{
fmt.Sprintf("http://%s/metrics", jobTargetURL),
},
)
// Apply the same config for the same job but with a non existing file to make the provider
// unable to discover some targets
applyConfig(
t,
fmt.Sprintf(baseConfig, jobName, "/idontexistdoi.json"),
scrapeManager,
discoveryManager,
)
// The old target should get dropped
requireTargets(
t,
scrapeManager,
jobName,
false,
nil,
)
}
// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when aone of them should no,
// longer discover targets, only the stale targets of that provier are dropped.
func TestOnlyStaleTargetsAreDropped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
myJob := "my-job"
myJobSDTargetURL := "my:9876"
myJobStaticTargetURL := "my:5432"
otherJob := "other-job"
otherJobTargetURL := "other:1234"
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL)
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
baseConfig := `
scrape_configs:
- job_name: %s
static_configs:
- targets: ['%s']
file_sd_configs:
- files: ['%s']
- job_name: %s
static_configs:
- targets: ['%s']
`
discoveryManager, scrapeManager := runManagers(t, ctx)
defer scrapeManager.Stop()
// Apply the initial config with an existing file
applyConfig(
t,
fmt.Sprintf(
baseConfig,
myJob,
myJobStaticTargetURL,
sDFile.Name(),
otherJob,
otherJobTargetURL,
),
scrapeManager,
discoveryManager,
)
// Make sure the jobs targets are taken into account
requireTargets(
t,
scrapeManager,
myJob,
true,
[]string{
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL),
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
},
)
requireTargets(
t,
scrapeManager,
otherJob,
true,
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
)
// Apply the same config with a non existing file for myJob
applyConfig(
t,
fmt.Sprintf(
baseConfig,
myJob,
myJobStaticTargetURL,
"/idontexistdoi.json",
otherJob,
otherJobTargetURL,
),
scrapeManager,
discoveryManager,
)
// Only the SD target should get dropped for myJob
requireTargets(
t,
scrapeManager,
myJob,
false,
[]string{
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
},
)
// The otherJob should keep its target
requireTargets(
t,
scrapeManager,
otherJob,
false,
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
)
}

Loading…
Cancel
Save