diff --git a/config/config.go b/config/config.go index 510de3136..47fa6ee4c 100644 --- a/config/config.go +++ b/config/config.go @@ -497,6 +497,8 @@ type ScrapeConfig struct { MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. Scheme string `yaml:"scheme,omitempty"` + // More than this many samples post metric-relabelling will cause the scrape to fail. + SampleLimit uint `yaml:"sample_limit,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/config/config_test.go b/config/config_test.go index 31d678622..9d686dbfb 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -133,6 +133,7 @@ var expectedConf = &Config{ ScrapeInterval: model.Duration(50 * time.Second), ScrapeTimeout: model.Duration(5 * time.Second), + SampleLimit: 1000, HTTPClientConfig: HTTPClientConfig{ BasicAuth: &BasicAuth{ diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 9f62cb4ca..31c237a0f 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -70,6 +70,8 @@ scrape_configs: scrape_interval: 50s scrape_timeout: 5s + sample_limit: 1000 + metrics_path: /my_path scheme: https diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 0b4e4a8e9..cf35acb28 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -33,9 +33,10 @@ import ( ) const ( - scrapeHealthMetricName = "up" - scrapeDurationMetricName = "scrape_duration_seconds" - scrapeSamplesMetricName = "scrape_samples_scraped" + scrapeHealthMetricName = "up" + scrapeDurationMetricName = "scrape_duration_seconds" + scrapeSamplesMetricName = "scrape_samples_scraped" + samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" ) var ( @@ -76,6 +77,12 @@ var ( }, []string{"scrape_job"}, ) + targetScrapeSampleLimit = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrapes_exceeded_sample_limit_total", + Help: "Total number of scrapes that hit the sample limit and were rejected.", + }, + ) ) func init() { @@ -84,6 +91,7 @@ func init() { prometheus.MustRegister(targetReloadIntervalLength) prometheus.MustRegister(targetSyncIntervalLength) prometheus.MustRegister(targetScrapePoolSyncsCounter) + prometheus.MustRegister(targetScrapeSampleLimit) } // scrapePool manages scrapes for sets of targets. @@ -101,7 +109,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop + newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { @@ -170,7 +178,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} - newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) ) wg.Add(1) @@ -231,7 +239,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} - l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) sp.targets[hash] = t sp.loops[hash] = l @@ -263,40 +271,6 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -// sampleAppender returns an appender for ingested samples from the target. -func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { - app := sp.appender - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, - } - } - - if sp.config.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), - } - } - return app -} - -// reportAppender returns an appender for reporting samples for the target. -func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { - return ruleLabelsAppender{ - SampleAppender: sp.appender, - labels: target.Labels(), - } -} - // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, ts time.Time) (model.Samples, error) @@ -364,20 +338,34 @@ type loop interface { type scrapeLoop struct { scraper scraper - appender storage.SampleAppender - reportAppender storage.SampleAppender + // Where samples are ultimately sent. + appender storage.SampleAppender + + targetLabels model.LabelSet + metricRelabelConfigs []*config.RelabelConfig + honorLabels bool + sampleLimit uint done chan struct{} ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { +func newScrapeLoop( + ctx context.Context, + sc scraper, + appender storage.SampleAppender, + targetLabels model.LabelSet, + config *config.ScrapeConfig, +) loop { sl := &scrapeLoop{ - scraper: sc, - appender: app, - reportAppender: reportApp, - done: make(chan struct{}), + scraper: sc, + appender: appender, + targetLabels: targetLabels, + metricRelabelConfigs: config.MetricRelabelConfigs, + honorLabels: config.HonorLabels, + sampleLimit: config.SampleLimit, + done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -408,8 +396,9 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { if !sl.appender.NeedsThrottling() { var ( - start = time.Now() - scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + numPostRelabelSamples = 0 ) // Only record after the first scrape. @@ -421,12 +410,12 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { samples, err := sl.scraper.scrape(scrapeCtx, start) if err == nil { - sl.append(samples) - } else if errc != nil { + numPostRelabelSamples, err = sl.append(samples) + } + if err != nil && errc != nil { errc <- err } - - sl.report(start, time.Since(start), len(samples), err) + sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err) last = start } else { targetSkippedScrapes.Inc() @@ -445,14 +434,73 @@ func (sl *scrapeLoop) stop() { <-sl.done } -func (sl *scrapeLoop) append(samples model.Samples) { +// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend +// appender and an innermost countingAppender that counts the samples actually +// appended in the end. +func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) { + // Innermost appender is a countingAppender to count how many samples + // are left in the end. + countingAppender := &countingAppender{ + SampleAppender: app, + } + app = countingAppender + + // The relabelAppender has to be inside the label-modifying appenders so + // the relabeling rules are applied to the correct label set. + if len(sl.metricRelabelConfigs) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: sl.metricRelabelConfigs, + } + } + + if sl.honorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: sl.targetLabels, + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: sl.targetLabels, + } + } + return app, countingAppender +} + +func (sl *scrapeLoop) append(samples model.Samples) (int, error) { var ( numOutOfOrder = 0 numDuplicates = 0 + app = sl.appender + countingApp *countingAppender ) + if sl.sampleLimit > 0 { + // We need to check for the sample limit, so append everything + // to a wrapped bufferAppender first. Then point samples to the + // result. + bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} + var wrappedBufApp storage.SampleAppender + wrappedBufApp, countingApp = sl.wrapAppender(bufApp) + for _, s := range samples { + // Ignore errors as bufferedAppender always succeds. + wrappedBufApp.Append(s) + } + samples = bufApp.buffer + if uint(countingApp.count) > sl.sampleLimit { + targetScrapeSampleLimit.Inc() + return countingApp.count, fmt.Errorf( + "%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit, + ) + } + } else { + // No need to check for sample limit. Wrap sl.appender directly. + app, countingApp = sl.wrapAppender(sl.appender) + } + for _, s := range samples { - if err := sl.appender.Append(s); err != nil { + if err := app.Append(s); err != nil { switch err { case local.ErrOutOfOrderSample: numOutOfOrder++ @@ -471,9 +519,10 @@ func (sl *scrapeLoop) append(samples model.Samples) { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } + return countingApp.count, nil } -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) { +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) { sl.scraper.report(start, duration, err) ts := model.TimeFromUnixNano(start.UnixNano()) @@ -504,14 +553,29 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam Timestamp: ts, Value: model.SampleValue(scrapedSamples), } + postRelabelSample := &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: samplesPostRelabelMetricName, + }, + Timestamp: ts, + Value: model.SampleValue(postRelabelSamples), + } - if err := sl.reportAppender.Append(healthSample); err != nil { + reportAppender := ruleLabelsAppender{ + SampleAppender: sl.appender, + labels: sl.targetLabels, + } + + if err := reportAppender.Append(healthSample); err != nil { log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") } - if err := sl.reportAppender.Append(durationSample); err != nil { + if err := reportAppender.Append(durationSample); err != nil { log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") } - if err := sl.reportAppender.Append(countSample); err != nil { + if err := reportAppender.Append(countSample); err != nil { log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") } + if err := reportAppender.Append(postRelabelSample); err != nil { + log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded") + } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index aaa19132f..0aec04539 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -222,44 +222,19 @@ func TestScrapePoolReload(t *testing.T) { } } -func TestScrapePoolReportAppender(t *testing.T) { +func TestScrapeLoopWrapSampleAppender(t *testing.T) { cfg := &config.ScrapeConfig{ MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - app := &nopAppender{} - - sp := newScrapePool(context.Background(), cfg, app) - - cfg.HonorLabels = false - wrapped := sp.reportAppender(target) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = sp.reportAppender(target) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) - } -} - -func TestScrapePoolSampleAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("does_not_match_.*"), + }, + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("does_not_match_either_*"), + }, }, } @@ -269,7 +244,15 @@ func TestScrapePoolSampleAppender(t *testing.T) { sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false - wrapped := sp.sampleAppender(target) + + sl := sp.newLoop( + sp.ctx, + &targetScraper{Target: target, client: sp.client}, + sp.appender, + target.Labels(), + sp.config, + ).(*scrapeLoop) + wrapped, _ := sl.wrapAppender(sl.appender) rl, ok := wrapped.(ruleLabelsAppender) if !ok { @@ -279,12 +262,23 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + co, ok := re.SampleAppender.(*countingAppender) + if !ok { + t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) + } + if co.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", co.SampleAppender) } cfg.HonorLabels = true - wrapped = sp.sampleAppender(target) + sl = sp.newLoop( + sp.ctx, + &targetScraper{Target: target, client: sp.client}, + sp.appender, + target.Labels(), + sp.config, + ).(*scrapeLoop) + wrapped, _ = sl.wrapAppender(sl.appender) hl, ok := wrapped.(honorLabelsAppender) if !ok { @@ -294,17 +288,176 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + co, ok = re.SampleAppender.(*countingAppender) + if !ok { + t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) } + if co.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", co.SampleAppender) + } +} + +func TestScrapeLoopSampleProcessing(t *testing.T) { + readSamples := model.Samples{ + { + Metric: model.Metric{"__name__": "a_metric"}, + }, + { + Metric: model.Metric{"__name__": "b_metric"}, + }, + } + + testCases := []struct { + scrapedSamples model.Samples + scrapeConfig *config.ScrapeConfig + expectedReportedSamples model.Samples + expectedPostRelabelSamplesCount int + }{ + { // 0 + scrapedSamples: readSamples, + scrapeConfig: &config.ScrapeConfig{}, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 2, + }, + }, + expectedPostRelabelSamplesCount: 2, + }, + { // 1 + scrapedSamples: readSamples, + scrapeConfig: &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a.*"), + }, + }, + }, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 1, + }, + }, + expectedPostRelabelSamplesCount: 1, + }, + { // 2 + scrapedSamples: readSamples, + scrapeConfig: &config.ScrapeConfig{ + SampleLimit: 1, + MetricRelabelConfigs: []*config.RelabelConfig{ + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a.*"), + }, + }, + }, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 1, + }, + }, + expectedPostRelabelSamplesCount: 1, + }, + { // 3 + scrapedSamples: readSamples, + scrapeConfig: &config.ScrapeConfig{ + SampleLimit: 1, + }, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 0, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, + Value: 2, + }, + }, + expectedPostRelabelSamplesCount: 2, + }, + } + + for i, test := range testCases { + ingestedSamples := &bufferAppender{buffer: model.Samples{}} + + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + + scraper := &testScraper{} + sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop) + num, err := sl.append(test.scrapedSamples) + sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err) + reportedSamples := ingestedSamples.buffer + if err == nil { + reportedSamples = reportedSamples[num:] + } + + if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) { + t.Errorf("Reported samples did not match expected metrics for case %d", i) + t.Errorf("Expected: %v", test.expectedReportedSamples) + t.Fatalf("Got: %v", reportedSamples) + } + if test.expectedPostRelabelSamplesCount != num { + t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount) + } + } + } func TestScrapeLoopStop(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{}) // The scrape pool synchronizes on stopping scrape loops. However, new scrape - // loops are syarted asynchronously. Thus it's possible, that a loop is stopped + // loops are started asynchronously. Thus it's possible, that a loop is stopped // again before having started properly. // Stopping not-yet-started loops must block until the run method was called and exited. // The run method must exit immediately. @@ -351,14 +504,13 @@ func TestScrapeLoopRun(t *testing.T) { signal = make(chan struct{}) errc = make(chan error) - scraper = &testScraper{} - app = &nopAppender{} - reportApp = &nopAppender{} + scraper = &testScraper{} + app = &nopAppender{} ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp) + sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) // The loop must terminate during the initial offset if the context // is canceled. @@ -396,7 +548,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp) + sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) go func() { sl.run(time.Second, 100*time.Millisecond, errc) diff --git a/retrieval/target.go b/retrieval/target.go index a9a793f9a..ca5cc0db7 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -278,6 +278,29 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } +// bufferAppender appends samples to the given buffer. +type bufferAppender struct { + buffer model.Samples +} + +func (app *bufferAppender) Append(s *model.Sample) error { + app.buffer = append(app.buffer, s) + return nil +} + +func (app *bufferAppender) NeedsThrottling() bool { return false } + +// countingAppender counts the samples appended to the underlying appender. +type countingAppender struct { + storage.SampleAppender + count int +} + +func (app *countingAppender) Append(s *model.Sample) error { + app.count++ + return app.SampleAppender.Append(s) +} + // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling.