mirror of https://github.com/prometheus/prometheus
Merge pull request #2765 from prometheus/memmap
retrieval: Don't allocate map on every scrapepull/2771/head
commit
8fef036078
|
@ -427,9 +427,14 @@ type scrapeLoop struct {
|
||||||
reportAppender func() storage.Appender
|
reportAppender func() storage.Appender
|
||||||
|
|
||||||
// TODO: Keep only the values from the last scrape to avoid a memory leak.
|
// TODO: Keep only the values from the last scrape to avoid a memory leak.
|
||||||
refCache map[string]string // Parsed string to ref.
|
refCache map[string]string // Parsed string to ref.
|
||||||
lsetCache map[string]lsetCacheEntry // Ref to labelset and string
|
lsetCache map[string]lsetCacheEntry // Ref to labelset and string
|
||||||
seriesInPreviousScrape map[string]labels.Labels
|
|
||||||
|
// seriesCur and seriesPrev store the labels of series that were seen
|
||||||
|
// in the current and previous scrape.
|
||||||
|
// We hold two maps and swap them out to save allocations.
|
||||||
|
seriesCur map[string]labels.Labels
|
||||||
|
seriesPrev map[string]labels.Labels
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
scrapeCtx context.Context
|
scrapeCtx context.Context
|
||||||
|
@ -447,6 +452,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
|
||||||
reportAppender: reportApp,
|
reportAppender: reportApp,
|
||||||
refCache: map[string]string{},
|
refCache: map[string]string{},
|
||||||
lsetCache: map[string]lsetCacheEntry{},
|
lsetCache: map[string]lsetCacheEntry{},
|
||||||
|
seriesCur: map[string]labels.Labels{},
|
||||||
|
seriesPrev: map[string]labels.Labels{},
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
l: l,
|
l: l,
|
||||||
|
@ -615,7 +622,6 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
|
||||||
app = sl.appender()
|
app = sl.appender()
|
||||||
p = textparse.New(b)
|
p = textparse.New(b)
|
||||||
defTime = timestamp.FromTime(ts)
|
defTime = timestamp.FromTime(ts)
|
||||||
seriesScraped = make(map[string]labels.Labels, len(sl.seriesInPreviousScrape))
|
|
||||||
numOutOfOrder = 0
|
numOutOfOrder = 0
|
||||||
numDuplicates = 0
|
numDuplicates = 0
|
||||||
)
|
)
|
||||||
|
@ -635,7 +641,10 @@ loop:
|
||||||
if ok {
|
if ok {
|
||||||
switch err = app.AddFast(ref, t, v); err {
|
switch err = app.AddFast(ref, t, v); err {
|
||||||
case nil:
|
case nil:
|
||||||
seriesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
|
if tp == nil {
|
||||||
|
// Bypass staleness logic if there is an explicit timestamp.
|
||||||
|
sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
|
||||||
|
}
|
||||||
case storage.ErrNotFound:
|
case storage.ErrNotFound:
|
||||||
ok = false
|
ok = false
|
||||||
case errSeriesDropped:
|
case errSeriesDropped:
|
||||||
|
@ -684,7 +693,7 @@ loop:
|
||||||
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
|
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
|
||||||
if tp == nil {
|
if tp == nil {
|
||||||
// Bypass staleness logic if there is an explicit timestamp.
|
// Bypass staleness logic if there is an explicit timestamp.
|
||||||
seriesScraped[str] = lset
|
sl.seriesCur[str] = lset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
added++
|
added++
|
||||||
|
@ -699,8 +708,8 @@ loop:
|
||||||
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for metric, lset := range sl.seriesInPreviousScrape {
|
for metric, lset := range sl.seriesPrev {
|
||||||
if _, ok := seriesScraped[metric]; !ok {
|
if _, ok := sl.seriesCur[metric]; !ok {
|
||||||
// Series no longer exposed, mark it stale.
|
// Series no longer exposed, mark it stale.
|
||||||
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||||
switch err {
|
switch err {
|
||||||
|
@ -726,7 +735,15 @@ loop:
|
||||||
if err := app.Commit(); err != nil {
|
if err := app.Commit(); err != nil {
|
||||||
return total, 0, err
|
return total, 0, err
|
||||||
}
|
}
|
||||||
sl.seriesInPreviousScrape = seriesScraped
|
|
||||||
|
// Swap current and previous series.
|
||||||
|
sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev
|
||||||
|
|
||||||
|
// We have to delete every single key in the map.
|
||||||
|
for k := range sl.seriesCur {
|
||||||
|
delete(sl.seriesCur, k)
|
||||||
|
}
|
||||||
|
|
||||||
return total, added, nil
|
return total, added, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -606,6 +606,8 @@ func TestScrapeLoopAppend(t *testing.T) {
|
||||||
reportAppender: func() storage.Appender { return nopAppender{} },
|
reportAppender: func() storage.Appender { return nopAppender{} },
|
||||||
refCache: map[string]string{},
|
refCache: map[string]string{},
|
||||||
lsetCache: map[string]lsetCacheEntry{},
|
lsetCache: map[string]lsetCacheEntry{},
|
||||||
|
seriesCur: map[string]labels.Labels{},
|
||||||
|
seriesPrev: map[string]labels.Labels{},
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -645,6 +647,8 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
|
||||||
reportAppender: func() storage.Appender { return nopAppender{} },
|
reportAppender: func() storage.Appender { return nopAppender{} },
|
||||||
refCache: map[string]string{},
|
refCache: map[string]string{},
|
||||||
lsetCache: map[string]lsetCacheEntry{},
|
lsetCache: map[string]lsetCacheEntry{},
|
||||||
|
seriesCur: map[string]labels.Labels{},
|
||||||
|
seriesPrev: map[string]labels.Labels{},
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -738,6 +742,8 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
|
||||||
reportAppender: func() storage.Appender { return nopAppender{} },
|
reportAppender: func() storage.Appender { return nopAppender{} },
|
||||||
refCache: map[string]string{},
|
refCache: map[string]string{},
|
||||||
lsetCache: map[string]lsetCacheEntry{},
|
lsetCache: map[string]lsetCacheEntry{},
|
||||||
|
seriesCur: map[string]labels.Labels{},
|
||||||
|
seriesPrev: map[string]labels.Labels{},
|
||||||
l: log.Base(),
|
l: log.Base(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue