diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 727e3b49f..8fc27a662 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -427,9 +427,13 @@ type scrapeLoop struct { reportAppender func() storage.Appender // TODO: Keep only the values from the last scrape to avoid a memory leak. - refCache map[string]string // Parsed string to ref. - lsetCache map[string]lsetCacheEntry // Ref to labelset and string - seriesInPreviousScrape map[string]labels.Labels + refCache map[string]string // Parsed string to ref. + lsetCache map[string]lsetCacheEntry // Ref to labelset and string + + // seriesCur and seriesPrev store the labels of series that were seen + // in the current and previous scrape. + seriesCur map[string]labels.Labels + seriesPrev map[string]labels.Labels ctx context.Context scrapeCtx context.Context @@ -447,6 +451,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag reportAppender: reportApp, refCache: map[string]string{}, lsetCache: map[string]lsetCacheEntry{}, + seriesCur: map[string]labels.Labels{}, + seriesPrev: map[string]labels.Labels{}, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -615,7 +621,6 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro app = sl.appender() p = textparse.New(b) defTime = timestamp.FromTime(ts) - seriesScraped = make(map[string]labels.Labels, len(sl.seriesInPreviousScrape)) numOutOfOrder = 0 numDuplicates = 0 ) @@ -635,7 +640,10 @@ loop: if ok { switch err = app.AddFast(ref, t, v); err { case nil: - seriesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset + if tp == nil { + // Bypass staleness logic if there is an explicit timestamp. + sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset + } case storage.ErrNotFound: ok = false case errSeriesDropped: @@ -684,7 +692,7 @@ loop: sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} if tp == nil { // Bypass staleness logic if there is an explicit timestamp. - seriesScraped[str] = lset + sl.seriesCur[str] = lset } } added++ @@ -699,8 +707,8 @@ loop: sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } if err == nil { - for metric, lset := range sl.seriesInPreviousScrape { - if _, ok := seriesScraped[metric]; !ok { + for metric, lset := range sl.seriesPrev { + if _, ok := sl.seriesCur[metric]; !ok { // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) switch err { @@ -726,7 +734,15 @@ loop: if err := app.Commit(); err != nil { return total, 0, err } - sl.seriesInPreviousScrape = seriesScraped + + // Swap current and previous series. + sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev + + // We have to delete every single key in the map. + for k := range sl.seriesCur { + delete(sl.seriesCur, k) + } + return total, added, nil } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 1e6cdeea3..c76a70da3 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -606,6 +606,8 @@ func TestScrapeLoopAppend(t *testing.T) { reportAppender: func() storage.Appender { return nopAppender{} }, refCache: map[string]string{}, lsetCache: map[string]lsetCacheEntry{}, + seriesCur: map[string]labels.Labels{}, + seriesPrev: map[string]labels.Labels{}, } now := time.Now() @@ -645,6 +647,8 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { reportAppender: func() storage.Appender { return nopAppender{} }, refCache: map[string]string{}, lsetCache: map[string]lsetCacheEntry{}, + seriesCur: map[string]labels.Labels{}, + seriesPrev: map[string]labels.Labels{}, } now := time.Now() @@ -738,6 +742,8 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { reportAppender: func() storage.Appender { return nopAppender{} }, refCache: map[string]string{}, lsetCache: map[string]lsetCacheEntry{}, + seriesCur: map[string]labels.Labels{}, + seriesPrev: map[string]labels.Labels{}, l: log.Base(), }