Merge pull request #2774 from prometheus/stalemem

Fix staleness memory leak
pull/2777/head
Fabian Reinartz 8 years ago committed by GitHub
commit c6eed97c77

@ -99,7 +99,8 @@ func (p *Parser) Err() error {
}
// Metric writes the labels of the current sample into the passed labels.
func (p *Parser) Metric(l *labels.Labels) {
// It returns the string from which the metric was parsed.
func (p *Parser) Metric(l *labels.Labels) string {
// Allocate the full immutable string immediately, so we just
// have to create references on it below.
s := string(p.l.b[p.l.mstart:p.l.mend])
@ -118,6 +119,8 @@ func (p *Parser) Metric(l *labels.Labels) {
}
sort.Sort((*l)[1:])
return s
}
func yoloString(b []byte) string {

@ -120,6 +120,16 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
newLoop := func(
ctx context.Context,
s scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) loop {
return newScrapeLoop(ctx, s, app, reportApp, l)
}
return &scrapePool{
appendable: app,
config: cfg,
@ -127,7 +137,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
newLoop: newLoop,
}
}
@ -416,44 +426,121 @@ type loop interface {
type lsetCacheEntry struct {
lset labels.Labels
str string
hash uint64
}
type refEntry struct {
ref string
lastIter uint64
}
type scrapeLoop struct {
scraper scraper
l log.Logger
cache *scrapeCache
appender func() storage.Appender
reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak.
refCache map[string]string // Parsed string to ref.
lsetCache map[string]lsetCacheEntry // Ref to labelset and string
ctx context.Context
scrapeCtx context.Context
cancel func()
stopped chan struct{}
}
// scrapeCache tracks mappings of exposed metric strings to label sets and
// storage references. Additionally, it tracks staleness of series between
// scrapes.
type scrapeCache struct {
iter uint64 // Current scrape iteration.
refs map[string]*refEntry // Parsed string to ref.
lsets map[string]*lsetCacheEntry // Ref to labelset and string.
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[string]labels.Labels
seriesPrev map[string]labels.Labels
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
}
ctx context.Context
scrapeCtx context.Context
cancel func()
stopped chan struct{}
func newScrapeCache() *scrapeCache {
return &scrapeCache{
refs: map[string]*refEntry{},
lsets: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
}
func (c *scrapeCache) iterDone() {
// refCache and lsetCache may grow over time through series churn
// or multiple string representations of the same metric. Clean up entries
// that haven't appeared in the last scrape.
for s, e := range c.refs {
if e.lastIter < c.iter {
delete(c.refs, s)
delete(c.lsets, e.ref)
}
}
// Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
// We have to delete every single key in the map.
for k := range c.seriesCur {
delete(c.seriesCur, k)
}
c.iter++
}
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop {
func (c *scrapeCache) getRef(met string) (string, bool) {
e, ok := c.refs[met]
if !ok {
return "", false
}
e.lastIter = c.iter
return e.ref, true
}
func (c *scrapeCache) addRef(met, ref string, lset labels.Labels) {
c.refs[met] = &refEntry{ref: ref, lastIter: c.iter}
// met is the raw string the metric was ingested as. The label set is not ordered
// and thus it's not suitable to uniquely identify cache entries.
// We store a hash over the label set instead.
c.lsets[ref] = &lsetCacheEntry{lset: lset, hash: lset.Hash()}
}
func (c *scrapeCache) trackStaleness(ref string) {
e := c.lsets[ref]
c.seriesCur[e.hash] = e.lset
}
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
for h, lset := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
break
}
}
}
}
func newScrapeLoop(
ctx context.Context,
sc scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) *scrapeLoop {
if l == nil {
l = log.Base()
}
sl := &scrapeLoop{
scraper: sc,
appender: app,
cache: newScrapeCache(),
reportAppender: reportApp,
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
stopped: make(chan struct{}),
ctx: ctx,
l: l,
@ -636,14 +723,12 @@ loop:
t = *tp
}
mets := yoloString(met)
ref, ok := sl.refCache[mets]
ref, ok := sl.cache.getRef(yoloString(met))
if ok {
switch err = app.AddFast(ref, t, v); err {
case nil:
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
sl.cache.trackStaleness(ref)
}
case storage.ErrNotFound:
ok = false
@ -652,10 +737,10 @@ loop:
continue
case storage.ErrOutOfOrderSample:
sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1
numOutOfOrder++
continue
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates += 1
numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue
default:
@ -664,8 +749,9 @@ loop:
}
if !ok {
var lset labels.Labels
p.Metric(&lset)
mets := p.Metric(&lset)
var ref string
ref, err = app.Add(lset, t, v)
// TODO(fabxc): also add a dropped-cache?
switch err {
@ -676,24 +762,22 @@ loop:
case storage.ErrOutOfOrderSample:
err = nil
sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1
numOutOfOrder++
continue
case storage.ErrDuplicateSampleForTimestamp:
err = nil
numDuplicates += 1
numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue
default:
break loop
}
// Allocate a real string.
mets = string(met)
sl.refCache[mets] = ref
str := lset.String()
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
sl.cache.addRef(mets, ref, lset)
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[str] = lset
sl.cache.trackStaleness(ref)
}
}
added++
@ -708,25 +792,19 @@ loop:
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
}
if err == nil {
for metric, lset := range sl.seriesPrev {
if _, ok := sl.seriesCur[metric]; !ok {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
continue
default:
break
}
}
}
return err == nil
})
}
if err != nil {
app.Rollback()
@ -736,13 +814,7 @@ loop:
return total, 0, err
}
// Swap current and previous series.
sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev
// We have to delete every single key in the map.
for k := range sl.seriesCur {
delete(sl.seriesCur, k)
}
sl.cache.iterDone()
return total, added, nil
}
@ -807,8 +879,11 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.refCache[s]
// Suffix s with the invalid \xff unicode rune to avoid collisions
// with scraped metrics.
s2 := s + "\xff"
ref, ok := sl.cache.getRef(s2)
if ok {
err := app.AddFast(ref, t, v)
switch err {
@ -830,7 +905,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ref, err := app.Add(met, t, v)
switch err {
case nil:
sl.refCache[s] = ref
sl.cache.addRef(s2, ref, met)
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil

@ -373,11 +373,9 @@ func TestScrapeLoopStop(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 2 {
go func() {
sl.stop()
}()
go sl.stop()
}
w.Write([]byte("metric_a 42\n"))
return nil
@ -398,7 +396,7 @@ func TestScrapeLoopStop(t *testing.T) {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result))
}
if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v))
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v))
}
if len(reportAppender.result) < 8 {
@ -515,7 +513,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 1 {
w.Write([]byte("metric_a 42\n"))
return nil
@ -564,7 +563,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 1 {
w.Write([]byte("metric_a 42\n"))
return nil
@ -601,15 +601,12 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
func TestScrapeLoopAppend(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now)
if err != nil {
@ -642,14 +639,11 @@ func TestScrapeLoopAppend(t *testing.T) {
func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\n"), now)
@ -688,12 +682,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1 1000\n"), now)
@ -737,15 +730,11 @@ func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
app := &errorAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{},
lsetCache: map[string]lsetCacheEntry{},
seriesCur: map[string]labels.Labels{},
seriesPrev: map[string]labels.Labels{},
l: log.Base(),
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Unix(1, 0)
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now)

Loading…
Cancel
Save