Protect against memory exhaustion when scraping.

Now that we're not losing the scrape cache across failed
scrape, a scrape that continually failed but had varying
series or metadata (e.g. timestamps in metric names,
plus hitting smaple_limit) would grow the cache indefinitely.

Add some code to catch that, and flush the cache anyway.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
pull/5711/head
Brian Brazil 6 years ago
parent dd3073616c
commit f7184978f4

@ -125,6 +125,12 @@ var (
Help: "Total number of samples rejected due to timestamp falling outside of the time bounds",
},
)
targetScrapeCacheFlushForced = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_cache_flush_forced_total",
Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.",
},
)
)
func init() {
@ -140,6 +146,7 @@ func init() {
prometheus.MustRegister(targetScrapeSampleDuplicate)
prometheus.MustRegister(targetScrapeSampleOutOfOrder)
prometheus.MustRegister(targetScrapeSampleOutOfBounds)
prometheus.MustRegister(targetScrapeCacheFlushForced)
}
// scrapePool manages scrapes for sets of targets.
@ -606,6 +613,9 @@ type scrapeLoop struct {
type scrapeCache struct {
iter uint64 // Current scrape iteration.
// How many series and metadata entries there were at the last success.
successfulCount int
// Parsed string to an entry with information about the actual label set
// and its storage reference.
series map[string]*cacheEntry
@ -643,8 +653,24 @@ func newScrapeCache() *scrapeCache {
}
}
func (c *scrapeCache) iterDone(cleanCache bool) {
if cleanCache {
func (c *scrapeCache) iterDone(flushCache bool) {
c.metaMtx.Lock()
count := len(c.series) + len(c.droppedSeries) + len(c.metadata)
c.metaMtx.Unlock()
if flushCache {
c.successfulCount = count
} else if count > c.successfulCount*2+1000 {
// If a target had varying labels in scrapes that ultimately failed,
// the caches would grow indefinitely. Force a flush when this happens.
// We use the heuristic that this is a doubling of the cache size
// since the last scrape, and allow an additional 1000 in case
// initial scrapes all fail.
flushCache = true
targetScrapeCacheFlushForced.Inc()
}
if flushCache {
// All caches may grow over time through series churn
// or multiple string representations of the same metric. Clean up entries
// that haven't appeared in the last scrape.
@ -1185,6 +1211,8 @@ loop:
return total, added, err
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
return total, added, nil

@ -859,6 +859,66 @@ func TestScrapeLoopCache(t *testing.T) {
}
}
func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
s := testutil.NewStorage(t)
defer s.Close()
sapp, err := s.Appender()
if err != nil {
t.Error(err)
}
appender := &collectResultAppender{next: sapp}
var (
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
0,
true,
)
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes < 5 {
s := ""
for i := 0; i < 500; i++ {
s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes)
}
w.Write([]byte(fmt.Sprintf(s + "&")))
} else {
cancel()
}
return nil
}
go func() {
sl.run(10*time.Millisecond, time.Hour, nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
if len(sl.cache.series) > 2000 {
t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series))
}
}
func TestScrapeLoopAppend(t *testing.T) {
tests := []struct {

Loading…
Cancel
Save