Merge pull request #12254 from zenador/histogram-bucket-limit

Implement bucket limit for native histograms
pull/12349/head
Björn Rabenstein 2023-05-10 17:42:29 +02:00 committed by GitHub
commit bd98fc8c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 346 additions and 45 deletions

View File

@ -489,6 +489,9 @@ type ScrapeConfig struct {
// More than this label value length post metric-relabeling will cause the
// scrape to fail.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// More than this many buckets in a native histogram will cause the scrape to
// fail.
NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.

View File

@ -376,6 +376,11 @@ metric_relabel_configs:
# 0 means no limit. This is an experimental feature, this behaviour could
# change in the future.
[ target_limit: <int> | default = 0 ]
# Limit on total number of positive and negative buckets allowed in a single
# native histogram. If this is exceeded, the entire scrape will be treated as
# failed. 0 means no limit.
[ native_histogram_bucket_limit: <int> | default = 0 ]
```
Where `<job_name>` must be unique across all scrape configurations.

54
scrape/clientprotobuf.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"bytes"
"encoding/binary"
"github.com/gogo/protobuf/proto"
// Intentionally using client model to simulate client in tests.
dto "github.com/prometheus/client_model/go"
)
// Write a MetricFamily into a protobuf.
// This function is intended for testing scraping by providing protobuf serialized input.
func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) {
buffer := &bytes.Buffer{}
err := AddMetricFamilyToProtobuf(buffer, metricFamily)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
// Append a MetricFamily protobuf representation to a buffer.
// This function is intended for testing scraping by providing protobuf serialized input.
func AddMetricFamilyToProtobuf(buffer *bytes.Buffer, metricFamily *dto.MetricFamily) error {
protoBuf, err := proto.Marshal(metricFamily)
if err != nil {
return err
}
varintBuf := make([]byte, binary.MaxVarintLen32)
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
_, err = buffer.Write(varintBuf[:varintLength])
if err != nil {
return err
}
_, err = buffer.Write(protoBuf)
return err
}

View File

@ -191,6 +191,12 @@ var (
},
[]string{"scrape_job"},
)
targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_native_histogram_bucket_limit_total",
Help: "Total number of scrapes that hit the native histogram bucket limit and were rejected.",
},
)
)
func init() {
@ -216,6 +222,7 @@ func init() {
targetScrapeExemplarOutOfOrder,
targetScrapePoolExceededLabelLimits,
targetSyncFailed,
targetScrapeNativeHistogramBucketLimit,
)
}
@ -256,6 +263,7 @@ type scrapeLoopOptions struct {
target *Target
scraper scraper
sampleLimit int
bucketLimit int
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
@ -319,6 +327,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
jitterSeed,
opts.honorTimestamps,
opts.sampleLimit,
opts.bucketLimit,
opts.labelLimits,
opts.interval,
opts.timeout,
@ -412,6 +421,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
@ -446,6 +456,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -530,6 +541,7 @@ func (sp *scrapePool) sync(targets []*Target) {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
@ -559,6 +571,7 @@ func (sp *scrapePool) sync(targets []*Target) {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -731,17 +744,24 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
}
// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, limit int) storage.Appender {
func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The limit is applied after metrics are potentially dropped via relabeling.
if limit > 0 {
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
if sampleLimit > 0 {
app = &limitAppender{
Appender: app,
limit: limit,
limit: sampleLimit,
}
}
if bucketLimit > 0 {
app = &bucketLimitAppender{
Appender: app,
limit: bucketLimit,
}
}
return app
@ -872,6 +892,7 @@ type scrapeLoop struct {
forcedErr error
forcedErrMtx sync.Mutex
sampleLimit int
bucketLimit int
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
@ -1152,6 +1173,7 @@ func newScrapeLoop(ctx context.Context,
jitterSeed uint64,
honorTimestamps bool,
sampleLimit int,
bucketLimit int,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
@ -1195,6 +1217,7 @@ func newScrapeLoop(ctx context.Context,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
@ -1482,6 +1505,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata
metadataChanged bool
@ -1510,7 +1534,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
}
// Take an appender with limits.
app = appender(app, sl.sampleLimit)
app = appender(app, sl.sampleLimit, sl.bucketLimit)
defer func() {
if err != nil {
@ -1631,7 +1655,7 @@ loop:
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs)
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
@ -1645,7 +1669,7 @@ loop:
sl.cache.trackStaleness(hash, lset)
}
sl.cache.addRef(met, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil {
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
@ -1681,6 +1705,13 @@ loop:
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc()
}
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
@ -1710,8 +1741,8 @@ loop:
}
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) {
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch errors.Cause(err) {
case nil:
if tp == nil && ce != nil {
@ -1740,6 +1771,11 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
// total number of samples scraped.
*sampleLimitErr = err
return false, nil
case errBucketLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*bucketLimitErr = err
return false, nil
default:
return false, err
}

View File

@ -30,6 +30,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -489,7 +490,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok := loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped := appender(appl.appender(context.Background()), 0)
wrapped := appender(appl.appender(context.Background()), 0, 0)
tl, ok := wrapped.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
@ -505,7 +506,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok = loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped = appender(appl.appender(context.Background()), sampleLimit)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 0)
sl, ok := wrapped.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", wrapped)
@ -515,6 +516,20 @@ func TestScrapePoolAppender(t *testing.T) {
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 100)
bl, ok := wrapped.(*bucketLimitAppender)
require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
sl, ok = bl.Appender.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", bl)
tl, ok = sl.Appender.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
}
func TestScrapePoolRaces(t *testing.T) {
@ -612,7 +627,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nopMutator,
nil, nil, 0,
true,
0,
0, 0,
nil,
1,
0,
@ -684,7 +699,7 @@ func TestScrapeLoopStop(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -760,7 +775,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
time.Hour,
@ -815,7 +830,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
100*time.Millisecond,
@ -874,7 +889,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
time.Hour,
@ -932,7 +947,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
cache,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -989,7 +1004,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1049,7 +1064,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1127,7 +1142,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1190,7 +1205,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1256,7 +1271,7 @@ func TestScrapeLoopCache(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1339,7 +1354,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1453,7 +1468,7 @@ func TestScrapeLoopAppend(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1548,7 +1563,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
},
nil,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, false, nil, false,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, false, nil, false,
)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
@ -1579,7 +1594,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1637,7 +1652,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
nil,
0,
true,
app.limit,
app.limit, 0,
nil,
0,
0,
@ -1697,6 +1712,104 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
require.Equal(t, 0, seriesAdded)
}
func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
resApp := &collectResultAppender{}
app := &bucketLimitAppender{Appender: resApp, limit: 2}
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
func(l labels.Labels) labels.Labels {
if l.Has("deleteme") {
return labels.EmptyLabels()
}
return l
},
nopMutator,
func(ctx context.Context) storage.Appender { return app },
nil,
0,
true,
app.limit, 0,
nil,
0,
0,
false,
false,
nil,
false,
)
metric := dto.Metric{}
err := targetScrapeNativeHistogramBucketLimit.Write(&metric)
require.NoError(t, err)
beforeMetricValue := metric.GetCounter().GetValue()
nativeHistogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "testing",
Name: "example_native_histogram",
Help: "This is used for testing",
ConstLabels: map[string]string{"some": "value"},
NativeHistogramBucketFactor: 1.1, // 10% increase from bucket to bucket
NativeHistogramMaxBucketNumber: 100, // intentionally higher than the limit we'll use in the scraper
},
[]string{"size"},
)
registry := prometheus.NewRegistry()
registry.Register(nativeHistogram)
nativeHistogram.WithLabelValues("S").Observe(1.0)
nativeHistogram.WithLabelValues("M").Observe(1.0)
nativeHistogram.WithLabelValues("L").Observe(1.0)
nativeHistogram.WithLabelValues("M").Observe(10.0)
nativeHistogram.WithLabelValues("L").Observe(10.0) // in different bucket since > 1*1.1
gathered, err := registry.Gather()
require.NoError(t, err)
require.NotEmpty(t, gathered)
histogramMetricFamily := gathered[0]
msg, err := MetricFamilyToProtobuf(histogramMetricFamily)
require.NoError(t, err)
now := time.Now()
total, added, seriesAdded, err := sl.append(app, msg, "application/vnd.google.protobuf", now)
require.NoError(t, err)
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 3, seriesAdded)
err = targetScrapeNativeHistogramBucketLimit.Write(&metric)
require.NoError(t, err)
metricValue := metric.GetCounter().GetValue()
require.Equal(t, beforeMetricValue, metricValue)
beforeMetricValue = metricValue
nativeHistogram.WithLabelValues("L").Observe(100.0) // in different bucket since > 10*1.1
gathered, err = registry.Gather()
require.NoError(t, err)
require.NotEmpty(t, gathered)
histogramMetricFamily = gathered[0]
msg, err = MetricFamilyToProtobuf(histogramMetricFamily)
require.NoError(t, err)
now = time.Now()
total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
if err != errBucketLimit {
t.Fatalf("Did not see expected histogram bucket limit error: %s", err)
}
require.NoError(t, app.Rollback())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
err = targetScrapeNativeHistogramBucketLimit.Write(&metric)
require.NoError(t, err)
metricValue = metric.GetCounter().GetValue()
require.Equal(t, beforeMetricValue+1, metricValue)
}
func TestScrapeLoop_ChangingMetricString(t *testing.T) {
// This is a regression test for the scrape loop cache not properly maintaining
// IDs when the string representation of a metric changes across a scrape. Thus
@ -1714,7 +1827,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1762,7 +1875,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1813,7 +1926,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1924,7 +2037,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1989,7 +2102,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2041,7 +2154,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -2077,7 +2190,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -2126,7 +2239,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2171,7 +2284,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2443,7 +2556,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
true,
0,
0, 0,
nil,
0,
0,
@ -2484,7 +2597,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
false,
0,
0, 0,
nil,
0,
0,
@ -2524,7 +2637,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2582,7 +2695,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2845,7 +2958,7 @@ func TestScrapeAddFast(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2908,7 +3021,7 @@ func TestReuseCacheRace(*testing.T) {
func TestCheckAddError(t *testing.T) {
var appErrs appendErrors
sl := scrapeLoop{l: log.NewNopLogger()}
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs)
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
require.Equal(t, 1, appErrs.numOutOfOrder)
}
@ -2931,7 +3044,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -3133,7 +3246,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
nil,
0,
true,
0,
0, 0,
&test.labelLimits,
0,
0,

View File

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/textparse"
@ -313,7 +314,10 @@ func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
var errSampleLimit = errors.New("sample limit exceeded")
var (
errSampleLimit = errors.New("sample limit exceeded")
errBucketLimit = errors.New("histogram bucket limit exceeded")
)
// limitAppender limits the number of total appended samples in a batch.
type limitAppender struct {
@ -355,6 +359,31 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
return ref, nil
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppender struct {
storage.Appender
limit int
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
return 0, errBucketLimit
}
}
if fh != nil {
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
return 0, errBucketLimit
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
// PopulateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.

View File

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
)
@ -488,3 +489,63 @@ scrape_configs:
})
}
}
func TestBucketLimitAppender(t *testing.T) {
example := histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 33,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{3, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []int64{3, 0, 0},
}
cases := []struct {
h histogram.Histogram
limit int
expectError bool
}{
{
h: example,
limit: 3,
expectError: true,
},
{
h: example,
limit: 10,
expectError: false,
},
}
resApp := &collectResultAppender{}
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &bucketLimitAppender{Appender: resApp, limit: c.limit}
ts := int64(10 * time.Minute / time.Millisecond)
h := c.h
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
_, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat())
} else {
_, err = app.AppendHistogram(0, lbls, ts, h.Copy(), nil)
}
if c.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.NoError(t, app.Commit())
})
}
}
}