From 30448286c7a2a0d57c1efa9e8df6251bbcbab630 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 16 Dec 2016 15:08:50 +0000 Subject: [PATCH] Add sample_limit to scrape config. This imposes a hard limit on the number of samples ingested from the target. This is counted after metric relabelling, to allow dropping of problemtic metrics. This is intended as a very blunt tool to prevent overload due to misbehaving targets that suddenly jump in sample count (e.g. adding a label containing email addresses). Add metric to track how often this happens. Fixes #2137 --- config/config.go | 2 + config/config_test.go | 1 + config/testdata/conf.good.yml | 2 + retrieval/scrape.go | 27 +++++++--- retrieval/scrape_test.go | 99 +++++++++++++++++++++++++++-------- 5 files changed, 102 insertions(+), 29 deletions(-) diff --git a/config/config.go b/config/config.go index 04522b57d..adf729cb7 100644 --- a/config/config.go +++ b/config/config.go @@ -497,6 +497,8 @@ type ScrapeConfig struct { MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. Scheme string `yaml:"scheme,omitempty"` + // More than this many samples post metric-relabelling will cause the scrape to fail. + SampleLimit uint `yaml:"sample_limit,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/config/config_test.go b/config/config_test.go index 66837ef58..183891e40 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -133,6 +133,7 @@ var expectedConf = &Config{ ScrapeInterval: model.Duration(50 * time.Second), ScrapeTimeout: model.Duration(5 * time.Second), + SampleLimit: 1000, HTTPClientConfig: HTTPClientConfig{ BasicAuth: &BasicAuth{ diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 65c2086d3..3c375bfc4 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -70,6 +70,8 @@ scrape_configs: scrape_interval: 50s scrape_timeout: 5s + sample_limit: 1000 + metrics_path: /my_path scheme: https diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 8ad7502dd..3a7dbee00 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -78,6 +78,12 @@ var ( }, []string{"scrape_job"}, ) + targetScrapeSampleLimit = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrapes_exceeded_sample_limit_total", + Help: "Total number of scrapes that hit the sample limit and were rejected.", + }, + ) ) func init() { @@ -86,6 +92,7 @@ func init() { prometheus.MustRegister(targetReloadIntervalLength) prometheus.MustRegister(targetSyncIntervalLength) prometheus.MustRegister(targetScrapePoolSyncsCounter) + prometheus.MustRegister(targetScrapeSampleLimit) } // scrapePool manages scrapes for sets of targets. @@ -103,7 +110,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender) loop + newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender, uint) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { @@ -172,7 +179,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} - newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) + newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) ) wg.Add(1) @@ -233,7 +240,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} - l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) + l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) sp.targets[hash] = t sp.loops[hash] = l @@ -373,18 +380,21 @@ type scrapeLoop struct { mutator func(storage.SampleAppender) storage.SampleAppender // For sending up and scrape_*. reportAppender storage.SampleAppender + // Limit on number of samples that will be accepted. + sampleLimit uint done chan struct{} ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { +func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { sl := &scrapeLoop{ scraper: sc, appender: app, mutator: mut, reportAppender: reportApp, + sampleLimit: sampleLimit, done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -460,8 +470,13 @@ func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error app.Append(sample) } - // Send samples to storage. - sl.append(buf.buffer) + if sl.sampleLimit > 0 && uint(len(buf.buffer)) > sl.sampleLimit { + scrapeErr = fmt.Errorf("%d samples exceeded limit of %d", len(buf.buffer), sl.sampleLimit) + targetScrapeSampleLimit.Inc() + } else { + // Send samples to storage. + sl.append(buf.buffer) + } } sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr) diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 8be99fe4c..55c95c402 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { + newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -312,14 +312,13 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { testCases := []struct { scrapedSamples model.Samples scrapeError error - metricRelabelConfigs []*config.RelabelConfig + scrapeConfig config.ScrapeConfig expectedReportedSamples model.Samples expectedIngestedSamplesCount int }{ { - scrapedSamples: readSamples, - scrapeError: nil, - metricRelabelConfigs: []*config.RelabelConfig{}, + scrapedSamples: readSamples, + scrapeError: nil, expectedReportedSamples: model.Samples{ { Metric: model.Metric{"__name__": "up"}, @@ -342,11 +341,45 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { { scrapedSamples: readSamples, scrapeError: nil, - metricRelabelConfigs: []*config.RelabelConfig{ + scrapeConfig: config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a.*"), + }, + }, + }, + expectedReportedSamples: model.Samples{ { - Action: config.RelabelDrop, - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp("a.*"), + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 1, + }, + }, + expectedIngestedSamplesCount: 1, + }, + { + scrapedSamples: readSamples, + scrapeError: nil, + scrapeConfig: config.ScrapeConfig{ + SampleLimit: 1, + MetricRelabelConfigs: []*config.RelabelConfig{ + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a.*"), + }, }, }, expectedReportedSamples: model.Samples{ @@ -369,9 +402,33 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { expectedIngestedSamplesCount: 1, }, { - scrapedSamples: model.Samples{}, - scrapeError: fmt.Errorf("error"), - metricRelabelConfigs: []*config.RelabelConfig{}, + scrapedSamples: readSamples, + scrapeError: nil, + scrapeConfig: config.ScrapeConfig{ + SampleLimit: 1, + }, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 0, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 2, + }, + }, + expectedIngestedSamplesCount: 0, + }, + { + scrapedSamples: model.Samples{}, + scrapeError: fmt.Errorf("error"), expectedReportedSamples: model.Samples{ { Metric: model.Metric{"__name__": "up"}, @@ -393,26 +450,22 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { }, } - for _, test := range testCases { + for i, test := range testCases { ingestedSamples := &bufferAppender{buffer: model.Samples{}} reportedSamples := &bufferAppender{buffer: model.Samples{}} target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: test.metricRelabelConfigs, - } - - sp := newScrapePool(context.Background(), cfg, ingestedSamples) + sp := newScrapePool(context.Background(), &test.scrapeConfig, ingestedSamples) scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples).(*scrapeLoop) + sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples, test.scrapeConfig.SampleLimit).(*scrapeLoop) sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0)) // Ignore value of scrape_duration_seconds, as it's time dependant. reportedSamples.buffer[1].Value = 0 if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) { - t.Errorf("Reported samples did not match expected metrics") + t.Errorf("Reported samples did not match expected metrics for case %d", i) t.Errorf("Expected: %v", test.expectedReportedSamples) t.Fatalf("Got: %v", reportedSamples.buffer) } @@ -425,7 +478,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { func TestScrapeLoopStop(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, 0) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are syarted asynchronously. Thus it's possible, that a loop is stopped @@ -483,7 +536,7 @@ func TestScrapeLoopRun(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, mut, reportApp) + sl := newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) // The loop must terminate during the initial offset if the context // is canceled. @@ -521,7 +574,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, mut, reportApp) + sl = newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) go func() { sl.run(time.Second, 100*time.Millisecond, errc)