diff --git a/retrieval/scrape.go b/retrieval/scrape.go index b01c8bb69..1812854bf 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -602,10 +602,10 @@ mainLoop: // A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. if total, added, err = sl.append(b, start); err != nil { - sl.l.With("err", err).Error("append failed") - // The append failed, probably due to a parse error. + sl.l.With("err", err).Warn("append failed") + // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. - if _, _, err = sl.append([]byte{}, start); err != nil { + if _, _, err := sl.append([]byte{}, start); err != nil { sl.l.With("err", err).Error("append failed") } } @@ -712,6 +712,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro numOutOfOrder = 0 numDuplicates = 0 ) + var sampleLimitErr error loop: for p.Next() { @@ -743,6 +744,12 @@ loop: numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue + case errSampleLimit: + // Keep on parsing output if we hit the limit, so we report the correct + // total number of samples scraped. + sampleLimitErr = err + added++ + continue default: break loop } @@ -769,6 +776,10 @@ loop: numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue + case errSampleLimit: + sampleLimitErr = err + added++ + continue default: break loop } @@ -785,6 +796,10 @@ loop: if err == nil { err = p.Err() } + if err == nil && sampleLimitErr != nil { + targetScrapeSampleLimit.Inc() + err = sampleLimitErr + } if numOutOfOrder > 0 { sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } @@ -808,10 +823,10 @@ loop: } if err != nil { app.Rollback() - return total, 0, err + return total, added, err } if err := app.Commit(); err != nil { - return total, 0, err + return total, added, err } sl.cache.iterDone() diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 3f5565530..51c83a304 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -711,6 +711,116 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { } +func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { + + cases := []struct { + appender func() storage.Appender + up float64 + scrapeSamplesScraped float64 + scrapeSamplesScrapedPostMetricRelabelling float64 + }{ + { + appender: func() storage.Appender { return nopAppender{} }, + up: 1, + scrapeSamplesScraped: 3, + scrapeSamplesScrapedPostMetricRelabelling: 3, + }, + { + appender: func() storage.Appender { + return &limitAppender{Appender: nopAppender{}, limit: 3} + }, + up: 1, + scrapeSamplesScraped: 3, + scrapeSamplesScrapedPostMetricRelabelling: 3, + }, + { + appender: func() storage.Appender { + return &limitAppender{Appender: nopAppender{}, limit: 2} + }, + up: 0, + scrapeSamplesScraped: 3, + scrapeSamplesScrapedPostMetricRelabelling: 3, + }, + { + appender: func() storage.Appender { + return &relabelAppender{ + Appender: &limitAppender{Appender: nopAppender{}, limit: 2}, + relabelings: []*config.RelabelConfig{ + &config.RelabelConfig{ + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a"), + Action: config.RelabelDrop, + }, + }, + } + }, + up: 1, + scrapeSamplesScraped: 3, + scrapeSamplesScrapedPostMetricRelabelling: 2, + }, + } + + for i, c := range cases { + reportAppender := &collectResultAppender{} + var ( + signal = make(chan struct{}) + scraper = &testScraper{} + numScrapes = 0 + reportApp = func() storage.Appender { + // Get result of the 2nd scrape. + if numScrapes == 2 { + return reportAppender + } else { + return nopAppender{} + } + } + ) + defer close(signal) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) + + // Setup a series to be stale, then 3 samples, then stop. + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes += 1 + if numScrapes == 1 { + w.Write([]byte("stale 0\n")) + return nil + } else if numScrapes == 2 { + w.Write([]byte("a 0\nb 0\nc 0 \n")) + return nil + } else if numScrapes == 3 { + cancel() + } + return fmt.Errorf("Scrape failed.") + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + if len(reportAppender.result) != 4 { + t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result)) + } + if reportAppender.result[0].v != c.up { + t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0]) + } + if reportAppender.result[2].v != c.scrapeSamplesScraped { + t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2]) + } + if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling { + t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3]) + } + } +} + type errorAppender struct { collectResultAppender } diff --git a/retrieval/target.go b/retrieval/target.go index 72e0f0b23..8ba6f46d3 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" ) @@ -228,6 +229,8 @@ func (ts Targets) Len() int { return len(ts) } func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +var errSampleLimit = errors.New("sample limit exceeded") + // limitAppender limits the number of total appended samples in a batch. type limitAppender struct { storage.Appender @@ -237,26 +240,29 @@ type limitAppender struct { } func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - if app.i+1 > app.limit { - return "", fmt.Errorf("sample limit of %d exceeded", app.limit) + if !value.IsStaleNaN(v) { + app.i++ + if app.i > app.limit { + return "", errSampleLimit + } } ref, err := app.Appender.Add(lset, t, v) if err != nil { return "", err } - app.i++ return ref, nil } func (app *limitAppender) AddFast(ref string, t int64, v float64) error { - if app.i+1 > app.limit { - return fmt.Errorf("sample limit of %d exceeded", app.limit) + if !value.IsStaleNaN(v) { + app.i++ + if app.i > app.limit { + return errSampleLimit + } } - if err := app.Appender.AddFast(ref, t, v); err != nil { return err } - app.i++ return nil }