diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 2135e0268..f23cea97a 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/pool" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -145,13 +146,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } + buffers := pool.NewBytesPool(163, 100e6, 3) + newLoop := func( ctx context.Context, s scraper, app, reportApp func() storage.Appender, l log.Logger, ) loop { - return newScrapeLoop(ctx, s, app, reportApp, l) + return newScrapeLoop(ctx, s, app, reportApp, buffers, l) } return &scrapePool{ @@ -470,9 +473,11 @@ type refEntry struct { } type scrapeLoop struct { - scraper scraper - l log.Logger - cache *scrapeCache + scraper scraper + l log.Logger + cache *scrapeCache + lastScrapeSize int + buffers *pool.BytesPool appender func() storage.Appender reportAppender func() storage.Appender @@ -573,16 +578,22 @@ func newScrapeLoop( ctx context.Context, sc scraper, app, reportApp func() storage.Appender, + buffers *pool.BytesPool, l log.Logger, ) *scrapeLoop { if l == nil { l = log.Base() } + if buffers == nil { + buffers = pool.NewBytesPool(10e3, 100e6, 3) + } sl := &scrapeLoop{ scraper: sc, appender: app, cache: newScrapeCache(), reportAppender: reportApp, + buffers: buffers, + lastScrapeSize: 16000, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -631,12 +642,20 @@ mainLoop: time.Since(last).Seconds(), ) } + b := sl.buffers.Get(sl.lastScrapeSize) + buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf) cancel() - var b []byte + if scrapeErr == nil { b = buf.Bytes() + // NOTE: There were issues with misbehaving clients in the past + // that occasionally returned empty results. We don't want those + // to falsely reset our buffer size. + if len(b) > 0 { + sl.lastScrapeSize = len(b) + } } else { sl.l.With("err", scrapeErr.Error()).Debug("scrape failed") if errc != nil { @@ -656,6 +675,8 @@ mainLoop: } } + sl.buffers.Put(b) + if scrapeErr == nil { scrapeErr = appErr } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index a49277ff9..376c23cdd 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -313,7 +313,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, nil) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are started asynchronously. Thus it's possible, that a loop is stopped @@ -371,7 +371,7 @@ func TestScrapeLoopStop(t *testing.T) { ) defer close(signal) - sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil) + sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -428,7 +428,7 @@ func TestScrapeLoopRun(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // The loop must terminate during the initial offset if the context // is canceled. @@ -466,7 +466,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl = newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) go func() { sl.run(time.Second, 100*time.Millisecond, errc) @@ -511,7 +511,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -561,7 +561,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -608,6 +608,7 @@ func TestScrapeLoopAppend(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) @@ -645,6 +646,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() @@ -688,6 +690,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() @@ -780,7 +783,7 @@ func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil, nil) // Setup a series to be stale, then 3 samples, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -831,7 +834,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -853,7 +856,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -895,6 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Unix(1, 0) @@ -925,6 +929,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now().Add(20 * time.Minute)