Merge pull request #15357 from prometheus/alexg/fix-fallback-scrape-release-3.0

scrape: stop erroring on empty scrapes
pull/15373/head
Alex Greenbank 2024-11-08 18:43:13 +00:00 committed by GitHub
commit d40b671cfe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 110 additions and 59 deletions

View File

@ -2,6 +2,8 @@
## unreleased ## unreleased
* [BUGFIX] Scraping: Don't log errors on empty scrapes. #15357
## 3.0.0-rc.0 / 2024-10-31 ## 3.0.0-rc.0 / 2024-10-31
* [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136 * [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136

View File

@ -1552,7 +1552,34 @@ type appendErrors struct {
numExemplarOutOfOrder int numExemplarOutOfOrder int
} }
// Update the stale markers.
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
return
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkers(app, defTime)
sl.cache.iterDone(false)
return
}
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
if p == nil { if p == nil {
sl.l.Error( sl.l.Error(
@ -1574,9 +1601,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
"err", err, "err", err,
) )
} }
var ( var (
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{} appErrs = appendErrors{}
sampleLimitErr error sampleLimitErr error
bucketLimitErr error bucketLimitErr error
@ -1617,9 +1642,8 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
if err != nil { if err != nil {
return return
} }
// Only perform cache cleaning if the scrape was not empty. // Flush and swap the cache as the scrape was non-empty.
// An empty scrape (usually) is used to indicate a failed scrape. sl.cache.iterDone(true)
sl.cache.iterDone(len(b) > 0)
}() }()
loop: loop:
@ -1862,19 +1886,7 @@ loop:
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
} }
if err == nil { if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool { err = sl.updateStaleMarkers(app, defTime)
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
} }
return return
} }

View File

@ -120,13 +120,13 @@ func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrde
timestampInorder2 := now.Add(5 * time.Minute) timestampInorder2 := now.Add(5 * time.Minute)
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", timestampInorder1) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "text/plain", timestampInorder1)
require.NoError(t, err) require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "", timestampOutOfOrder) _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "text/plain", timestampOutOfOrder)
require.NoError(t, err) require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "", timestampInorder2) _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "text/plain", timestampInorder2)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -756,6 +756,10 @@ func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
} }
func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration) *scrapeLoop { func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration) *scrapeLoop {
return newBasicScrapeLoopWithFallback(t, ctx, scraper, app, interval, "")
}
func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop {
return newScrapeLoop(ctx, return newScrapeLoop(ctx,
scraper, scraper,
nil, nil, nil, nil,
@ -783,7 +787,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
newTestScrapeMetrics(t), newTestScrapeMetrics(t),
false, false,
model.LegacyValidation, model.LegacyValidation,
"text/plain", fallback,
) )
} }
@ -844,7 +848,8 @@ func TestScrapeLoopStop(t *testing.T) {
app = func(ctx context.Context) storage.Appender { return appender } app = func(ctx context.Context) storage.Appender { return appender }
) )
sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, context.Background(), scraper, app, 10*time.Millisecond, "text/plain")
// Terminate loop after 2 scrapes. // Terminate loop after 2 scrapes.
numScrapes := 0 numScrapes := 0
@ -928,7 +933,7 @@ func TestScrapeLoopRun(t *testing.T) {
scrapeMetrics, scrapeMetrics,
false, false,
model.LegacyValidation, model.LegacyValidation,
"text/plain", "",
) )
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
@ -1075,7 +1080,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
scrapeMetrics, scrapeMetrics,
false, false,
model.LegacyValidation, model.LegacyValidation,
"text/plain", "",
) )
defer cancel() defer cancel()
@ -1126,7 +1131,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t) ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, 1, total) require.Equal(t, 1, total)
@ -1134,7 +1139,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
require.Equal(t, 1, seriesAdded) require.Equal(t, 1, seriesAdded)
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, total) require.Equal(t, 1, total)
@ -1164,7 +1169,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
} }
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "text/plain", time.Time{})
require.ErrorContains(t, err, "invalid metric name or label names") require.ErrorContains(t, err, "invalid metric name or label names")
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
require.Equal(t, 1, total) require.Equal(t, 1, total)
@ -1188,7 +1193,7 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
sl.validationScheme = model.LegacyValidation sl.validationScheme = model.LegacyValidation
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("{\"test.metric\"} 1\n"), "", time.Time{}) total, added, seriesAdded, err := sl.append(slApp, []byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
require.ErrorContains(t, err, "invalid metric name or label names") require.ErrorContains(t, err, "invalid metric name or label names")
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
require.Equal(t, 1, total) require.Equal(t, 1, total)
@ -1199,7 +1204,7 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
sl.validationScheme = model.UTF8Validation sl.validationScheme = model.UTF8Validation
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("{\"test.metric\"} 1\n"), "", time.Time{}) total, added, seriesAdded, err = sl.append(slApp, []byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, total) require.Equal(t, 1, total)
require.Equal(t, 1, added) require.Equal(t, 1, added)
@ -1229,7 +1234,7 @@ func BenchmarkScrapeLoopAppend(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second) ts = ts.Add(time.Second)
_, _, _, _ = sl.append(slApp, metrics, "", ts) _, _, _, _ = sl.append(slApp, metrics, "text/plain", ts)
} }
} }
@ -1338,7 +1343,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain")
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
numScrapes := 0 numScrapes := 0
@ -1384,7 +1390,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain")
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1435,7 +1442,8 @@ func TestScrapeLoopCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Decreasing the scrape interval could make the test fail, as multiple scrapes might be initiated at identical millisecond timestamps. // Decreasing the scrape interval could make the test fail, as multiple scrapes might be initiated at identical millisecond timestamps.
// See https://github.com/prometheus/prometheus/issues/12727. // See https://github.com/prometheus/prometheus/issues/12727.
sl := newBasicScrapeLoop(t, ctx, scraper, app, 100*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 100*time.Millisecond, "text/plain")
numScrapes := 0 numScrapes := 0
@ -1600,7 +1608,7 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1681,7 +1689,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil) return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
} }
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "text/plain", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1719,7 +1727,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, metric, "", now) _, _, _, err := sl.append(slApp, metric, "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1756,7 +1764,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "text/plain", now)
require.ErrorIs(t, err, errSampleLimit) require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
require.Equal(t, 3, total) require.Equal(t, 3, total)
@ -1785,7 +1793,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
now = time.Now() now = time.Now()
slApp = sl.appender(context.Background()) slApp = sl.appender(context.Background())
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "text/plain", now)
require.ErrorIs(t, err, errSampleLimit) require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
require.Equal(t, 9, total) require.Equal(t, 9, total)
@ -1913,12 +1921,12 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
slApp = sl.appender(context.Background()) slApp = sl.appender(context.Background())
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "text/plain", now.Add(time.Minute))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1937,6 +1945,33 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopAppendFailsWithNoContentType(t *testing.T) {
app := &collectResultAppender{}
// Explicitly setting the lack of fallback protocol here to make it obvious.
sl := newBasicScrapeLoopWithFallback(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0, "")
now := time.Now()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
// We expect the appropriate error.
require.ErrorContains(t, err, "non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target", "Expected \"non-compliant scrape\" error but got: %s", err)
}
func TestScrapeLoopAppendEmptyWithNoContentType(t *testing.T) {
// This test ensures we there are no errors when we get a blank scrape or just want to append a stale marker.
app := &collectResultAppender{}
// Explicitly setting the lack of fallback protocol here to make it obvious.
sl := newBasicScrapeLoopWithFallback(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0, "")
now := time.Now()
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(""), "", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
}
func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{} app := &collectResultAppender{}
@ -1944,7 +1979,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1973,7 +2008,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0) sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return app }, 0)
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -1999,7 +2034,7 @@ func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -2519,7 +2554,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
now := time.Unix(1, 0) now := time.Unix(1, 0)
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -2550,7 +2585,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
now := time.Now().Add(20 * time.Minute) now := time.Now().Add(20 * time.Minute)
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, 1, total) require.Equal(t, 1, total)
@ -2850,7 +2885,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -2877,7 +2912,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
now := time.Now() now := time.Now()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -2901,7 +2936,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
// We add a good and a bad metric to check that both are discarded. // We add a good and a bad metric to check that both are discarded.
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "text/plain", time.Time{})
require.Error(t, err) require.Error(t, err)
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
// We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them, // We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them,
@ -2916,7 +2951,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
// We add a good metric to check that it is recorded. // We add a good metric to check that it is recorded.
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) _, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -2945,7 +2980,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
defer cancel() defer cancel()
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "text/plain", time.Time{})
require.Error(t, err) require.Error(t, err)
require.NoError(t, slApp.Rollback()) require.NoError(t, slApp.Rollback())
require.Equal(t, errNameLabelMandatory, err) require.Equal(t, errNameLabelMandatory, err)
@ -3191,7 +3226,7 @@ func TestScrapeAddFast(t *testing.T) {
defer cancel() defer cancel()
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{}) _, _, _, err := sl.append(slApp, []byte("up 1\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
@ -3202,7 +3237,7 @@ func TestScrapeAddFast(t *testing.T) {
} }
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second)) _, _, _, err = sl.append(slApp, []byte("up 1\n"), "text/plain", time.Time{}.Add(time.Second))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
} }
@ -3257,7 +3292,8 @@ func TestScrapeReportSingleAppender(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newBasicScrapeLoop(t, ctx, scraper, s.Appender, 10*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, s.Appender, 10*time.Millisecond, "text/plain")
numScrapes := 0 numScrapes := 0
@ -3484,7 +3520,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
sl.labelLimits = &test.labelLimits sl.labelLimits = &test.labelLimits
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now()) _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "text/plain", time.Now())
t.Logf("Test:%s", test.title) t.Logf("Test:%s", test.title)
if test.expectErr { if test.expectErr {
@ -4176,7 +4212,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond) // Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain")
sl.trackTimestampsStaleness = true sl.trackTimestampsStaleness = true
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
numScrapes := 0 numScrapes := 0
@ -4421,7 +4458,7 @@ func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t) ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{}) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, 3, total) require.Equal(t, 3, total)
@ -4430,7 +4467,7 @@ func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
require.Equal(t, 2.0, prom_testutil.ToFloat64(sl.metrics.targetScrapeSampleDuplicate)) require.Equal(t, 2.0, prom_testutil.ToFloat64(sl.metrics.targetScrapeSampleDuplicate))
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{}) total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, 3, total) require.Equal(t, 3, total)
@ -4440,7 +4477,7 @@ func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
// When different timestamps are supplied, multiple samples are accepted. // When different timestamps are supplied, multiple samples are accepted.
slApp = sl.appender(ctx) slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1 1001\ntest_metric 1 1002\ntest_metric 1 1003\n"), "", time.Time{}) total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1 1001\ntest_metric 1 1002\ntest_metric 1 1003\n"), "text/plain", time.Time{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, 3, total) require.Equal(t, 3, total)