Browse Source

retrieval: Don't allocate map on every scrape

pull/2765/head
Fabian Reinartz 8 years ago
parent
commit
43ca652217
  1. 34
      retrieval/scrape.go
  2. 6
      retrieval/scrape_test.go

34
retrieval/scrape.go

@ -427,9 +427,13 @@ type scrapeLoop struct {
reportAppender func() storage.Appender reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak. // TODO: Keep only the values from the last scrape to avoid a memory leak.
refCache map[string]string // Parsed string to ref. refCache map[string]string // Parsed string to ref.
lsetCache map[string]lsetCacheEntry // Ref to labelset and string lsetCache map[string]lsetCacheEntry // Ref to labelset and string
seriesInPreviousScrape map[string]labels.Labels
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
seriesCur map[string]labels.Labels
seriesPrev map[string]labels.Labels
ctx context.Context ctx context.Context
scrapeCtx context.Context scrapeCtx context.Context
@ -447,6 +451,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
reportAppender: reportApp, reportAppender: reportApp,
refCache: map[string]string{}, refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
stopped: make(chan struct{}), stopped: make(chan struct{}),
ctx: ctx, ctx: ctx,
l: l, l: l,
@ -615,7 +621,6 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
app = sl.appender() app = sl.appender()
p = textparse.New(b) p = textparse.New(b)
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
seriesScraped = make(map[string]labels.Labels, len(sl.seriesInPreviousScrape))
numOutOfOrder = 0 numOutOfOrder = 0
numDuplicates = 0 numDuplicates = 0
) )
@ -635,7 +640,10 @@ loop:
if ok { if ok {
switch err = app.AddFast(ref, t, v); err { switch err = app.AddFast(ref, t, v); err {
case nil: case nil:
seriesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
}
case storage.ErrNotFound: case storage.ErrNotFound:
ok = false ok = false
case errSeriesDropped: case errSeriesDropped:
@ -684,7 +692,7 @@ loop:
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
if tp == nil { if tp == nil {
// Bypass staleness logic if there is an explicit timestamp. // Bypass staleness logic if there is an explicit timestamp.
seriesScraped[str] = lset sl.seriesCur[str] = lset
} }
} }
added++ added++
@ -699,8 +707,8 @@ loop:
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
} }
if err == nil { if err == nil {
for metric, lset := range sl.seriesInPreviousScrape { for metric, lset := range sl.seriesPrev {
if _, ok := seriesScraped[metric]; !ok { if _, ok := sl.seriesCur[metric]; !ok {
// Series no longer exposed, mark it stale. // Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err { switch err {
@ -726,7 +734,15 @@ loop:
if err := app.Commit(); err != nil { if err := app.Commit(); err != nil {
return total, 0, err return total, 0, err
} }
sl.seriesInPreviousScrape = seriesScraped
// Swap current and previous series.
sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev
// We have to delete every single key in the map.
for k := range sl.seriesCur {
delete(sl.seriesCur, k)
}
return total, added, nil return total, added, nil
} }

6
retrieval/scrape_test.go

@ -606,6 +606,8 @@ func TestScrapeLoopAppend(t *testing.T) {
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
} }
now := time.Now() now := time.Now()
@ -645,6 +647,8 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
} }
now := time.Now() now := time.Now()
@ -738,6 +742,8 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
l: log.Base(), l: log.Base(),
} }

Loading…
Cancel
Save