Don't lose the scrape cache on a failed scrape.

This avoids CPU usage increasing when the target comes back.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
pull/5711/head
Brian Brazil 6 years ago
parent 4f47806a7d
commit dd3073616c

@ -643,28 +643,32 @@ func newScrapeCache() *scrapeCache {
} }
} }
func (c *scrapeCache) iterDone() { func (c *scrapeCache) iterDone(cleanCache bool) {
// All caches may grow over time through series churn if cleanCache {
// or multiple string representations of the same metric. Clean up entries // All caches may grow over time through series churn
// that haven't appeared in the last scrape. // or multiple string representations of the same metric. Clean up entries
for s, e := range c.series { // that haven't appeared in the last scrape.
if c.iter-e.lastIter > 2 { for s, e := range c.series {
delete(c.series, s) if c.iter != e.lastIter {
delete(c.series, s)
}
} }
} for s, iter := range c.droppedSeries {
for s, iter := range c.droppedSeries { if c.iter != *iter {
if c.iter-*iter > 2 { delete(c.droppedSeries, s)
delete(c.droppedSeries, s) }
} }
} c.metaMtx.Lock()
c.metaMtx.Lock() for m, e := range c.metadata {
for m, e := range c.metadata { // Keep metadata around for 10 scrapes after its metric disappeared.
// Keep metadata around for 10 scrapes after its metric disappeared. if c.iter-e.lastIter > 10 {
if c.iter-e.lastIter > 10 { delete(c.metadata, m)
delete(c.metadata, m) }
} }
c.metaMtx.Unlock()
c.iter++
} }
c.metaMtx.Unlock()
// Swap current and previous series. // Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
@ -673,8 +677,6 @@ func (c *scrapeCache) iterDone() {
for k := range c.seriesCur { for k := range c.seriesCur {
delete(c.seriesCur, k) delete(c.seriesCur, k)
} }
c.iter++
} }
func (c *scrapeCache) get(met string) (*cacheEntry, bool) { func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
@ -1110,7 +1112,6 @@ loop:
var ref uint64 var ref uint64
ref, err = app.Add(lset, t, v) ref, err = app.Add(lset, t, v)
// TODO(fabxc): also add a dropped-cache?
switch err { switch err {
case nil: case nil:
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
@ -1184,7 +1185,7 @@ loop:
return total, added, err return total, added, err
} }
sl.cache.iterDone() sl.cache.iterDone(len(b) > 0)
return total, added, nil return total, added, nil
} }

@ -770,7 +770,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not. // each scrape successful or not.
if len(appender.result) != 14 { if len(appender.result) != 14 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 14, len(appender.result))
} }
if appender.result[0].v != 42.0 { if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0)
@ -780,6 +780,85 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
} }
} }
func TestScrapeLoopCache(t *testing.T) {
s := testutil.NewStorage(t)
defer s.Close()
sapp, err := s.Appender()
if err != nil {
t.Error(err)
}
appender := &collectResultAppender{next: sapp}
var (
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
0,
true,
)
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
if numScrapes == 1 || numScrapes == 2 {
if _, ok := sl.cache.series["metric_a"]; !ok {
t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
}
if _, ok := sl.cache.series["metric_b"]; !ok {
t.Errorf("metric_b missing from cache after scrape %d", numScrapes)
}
} else if numScrapes == 3 {
if _, ok := sl.cache.series["metric_a"]; !ok {
t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
}
if _, ok := sl.cache.series["metric_b"]; ok {
t.Errorf("metric_b present in cache after scrape %d", numScrapes)
}
}
numScrapes++
if numScrapes == 1 {
w.Write([]byte("metric_a 42\nmetric_b 43\n"))
return nil
} else if numScrapes == 3 {
w.Write([]byte("metric_a 44\n"))
return nil
} else if numScrapes == 4 {
cancel()
}
return fmt.Errorf("scrape failed")
}
go func() {
sl.run(10*time.Millisecond, time.Hour, nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not.
if len(appender.result) != 22 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
}
}
func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppend(t *testing.T) {
tests := []struct { tests := []struct {

Loading…
Cancel
Save