Browse Source

Native Histograms: support `native_histogram_min_bucket_factor` in scrape_config (#13222)

Native Histograms: support native_histogram_min_bucket_factor in scrape_config

---------

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
Signed-off-by: Björn Rabenstein <github@rabenste.in>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Co-authored-by: Björn Rabenstein <github@rabenste.in>
pull/13169/head
Ziqi Zhao 10 months ago committed by GitHub
parent
commit
df2a0ecf3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      config/config.go
  2. 40
      docs/configuration/configuration.md
  3. 34
      scrape/scrape.go
  4. 99
      scrape/scrape_test.go
  5. 29
      scrape/target.go
  6. 61
      scrape/target_test.go

7
config/config.go

@ -610,9 +610,12 @@ type ScrapeConfig struct {
// More than this label value length post metric-relabeling will cause the
// scrape to fail. 0 means no limit.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// More than this many buckets in a native histogram will cause the scrape to
// fail.
// If there are more than this many buckets in a native histogram,
// buckets will be merged to stay within the limit.
NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"`
// If the growth factor of one bucket to the next is smaller than this,
// buckets will be merged to increase the factor sufficiently.
NativeHistogramMinBucketFactor float64 `yaml:"native_histogram_min_bucket_factor,omitempty"`
// Keep no more than this many dropped targets per job.
// 0 means no limit.
KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"`

40
docs/configuration/configuration.md

@ -451,6 +451,46 @@ metric_relabel_configs:
# native histogram. If this is exceeded, the entire scrape will be treated as
# failed. 0 means no limit.
[ native_histogram_bucket_limit: <int> | default = 0 ]
# Lower limit for the growth factor of one bucket to the next in each native
# histogram. The resolution of a histogram with a lower growth factor will be
# reduced until it is within the limit.
# To set an upper limit for the schema (equivalent to "scale" in OTel's
# exponential histograms), use the following factor limits:
#
# +----------------------------+----------------------------+
# | growth factor | resulting schema AKA scale |
# +----------------------------+----------------------------+
# | 65536 | -4 |
# +----------------------------+----------------------------+
# | 256 | -3 |
# +----------------------------+----------------------------+
# | 16 | -2 |
# +----------------------------+----------------------------+
# | 4 | -1 |
# +----------------------------+----------------------------+
# | 2 | 0 |
# +----------------------------+----------------------------+
# | 1.4 | 1 |
# +----------------------------+----------------------------+
# | 1.1 | 2 |
# +----------------------------+----------------------------+
# | 1.09 | 3 |
# +----------------------------+----------------------------+
# | 1.04 | 4 |
# +----------------------------+----------------------------+
# | 1.02 | 5 |
# +----------------------------+----------------------------+
# | 1.01 | 6 |
# +----------------------------+----------------------------+
# | 1.005 | 7 |
# +----------------------------+----------------------------+
# | 1.002 | 8 |
# +----------------------------+----------------------------+
#
# 0 results in the smallest supported factor (which is currently ~1.0027 or
# schema 8, but might change in the future).
[ native_histogram_min_bucket_factor: <float> | default = 0 ]
```
Where `<job_name>` must be unique across all scrape configurations.

34
scrape/scrape.go

@ -99,6 +99,7 @@ type scrapeLoopOptions struct {
scraper scraper
sampleLimit int
bucketLimit int
maxSchema int32
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
@ -165,6 +166,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.enableCompression,
opts.sampleLimit,
opts.bucketLimit,
opts.maxSchema,
opts.labelLimits,
opts.interval,
opts.timeout,
@ -270,6 +272,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
@ -310,6 +313,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
maxSchema: maxSchema,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -613,7 +617,7 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
}
// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Appender {
func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
@ -633,6 +637,14 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Append
limit: bucketLimit,
}
}
if maxSchema < nativeHistogramMaxSchema {
app = &maxSchemaAppender{
Appender: app,
maxSchema: maxSchema,
}
}
return app
}
@ -786,6 +798,7 @@ type scrapeLoop struct {
forcedErrMtx sync.Mutex
sampleLimit int
bucketLimit int
maxSchema int32
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
@ -1078,6 +1091,7 @@ func newScrapeLoop(ctx context.Context,
enableCompression bool,
sampleLimit int,
bucketLimit int,
maxSchema int32,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
@ -1128,6 +1142,7 @@ func newScrapeLoop(ctx context.Context,
enableCompression: enableCompression,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
maxSchema: maxSchema,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
@ -1458,7 +1473,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
}
// Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit)
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
defer func() {
if err != nil {
@ -1906,3 +1921,18 @@ func TargetFromContext(ctx context.Context) (*Target, bool) {
t, ok := ctx.Value(ctxKeyTarget).(*Target)
return t, ok
}
func pickSchema(bucketFactor float64) int32 {
if bucketFactor <= 1 {
bucketFactor = 1.00271
}
floor := math.Floor(-math.Log2(math.Log2(bucketFactor)))
switch {
case floor >= float64(nativeHistogramMaxSchema):
return nativeHistogramMaxSchema
case floor <= float64(nativeHistogramMinSchema):
return nativeHistogramMinSchema
default:
return int32(floor)
}
}

99
scrape/scrape_test.go

@ -513,7 +513,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok := loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped := appender(appl.appender(context.Background()), 0, 0)
wrapped := appender(appl.appender(context.Background()), 0, 0, nativeHistogramMaxSchema)
tl, ok := wrapped.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
@ -529,7 +529,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok = loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 0)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 0, nativeHistogramMaxSchema)
sl, ok := wrapped.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", wrapped)
@ -540,7 +540,7 @@ func TestScrapePoolAppender(t *testing.T) {
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 100)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, nativeHistogramMaxSchema)
bl, ok := wrapped.(*bucketLimitAppender)
require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
@ -553,6 +553,23 @@ func TestScrapePoolAppender(t *testing.T) {
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, 0)
ml, ok := wrapped.(*maxSchemaAppender)
require.True(t, ok, "Expected maxSchemaAppender but got %T", wrapped)
bl, ok = ml.Appender.(*bucketLimitAppender)
require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
sl, ok = bl.Appender.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", bl)
tl, ok = sl.Appender.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
}
func TestScrapePoolRaces(t *testing.T) {
@ -653,7 +670,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
true,
false,
true,
0, 0,
0, 0, nativeHistogramMaxSchema,
nil,
interval,
time.Hour,
@ -796,7 +813,7 @@ func TestScrapeLoopRun(t *testing.T) {
true,
false,
true,
0, 0,
0, 0, nativeHistogramMaxSchema,
nil,
time.Second,
time.Hour,
@ -942,7 +959,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
true,
false,
true,
0, 0,
0, 0, nativeHistogramMaxSchema,
nil,
0,
0,
@ -3465,3 +3482,73 @@ func TestScrapeLoopCompression(t *testing.T) {
})
}
}
func TestPickSchema(t *testing.T) {
tcs := []struct {
factor float64
schema int32
}{
{
factor: 65536,
schema: -4,
},
{
factor: 256,
schema: -3,
},
{
factor: 16,
schema: -2,
},
{
factor: 4,
schema: -1,
},
{
factor: 2,
schema: 0,
},
{
factor: 1.4,
schema: 1,
},
{
factor: 1.1,
schema: 2,
},
{
factor: 1.09,
schema: 3,
},
{
factor: 1.04,
schema: 4,
},
{
factor: 1.02,
schema: 5,
},
{
factor: 1.01,
schema: 6,
},
{
factor: 1.005,
schema: 7,
},
{
factor: 1.002,
schema: 8,
},
// The default value of native_histogram_min_bucket_factor
{
factor: 0,
schema: 8,
},
}
for _, tc := range tcs {
schema := pickSchema(tc.factor)
require.Equal(t, tc.schema, schema)
}
}

29
scrape/target.go

@ -387,6 +387,35 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe
return ref, nil
}
const (
nativeHistogramMaxSchema int32 = 8
nativeHistogramMinSchema int32 = -4
)
type maxSchemaAppender struct {
storage.Appender
maxSchema int32
}
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if h.Schema > app.maxSchema {
h = h.ReduceResolution(app.maxSchema)
}
}
if fh != nil {
if fh.Schema > app.maxSchema {
fh = fh.ReduceResolution(app.maxSchema)
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
// PopulateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.

61
scrape/target_test.go

@ -590,3 +590,64 @@ func TestBucketLimitAppender(t *testing.T) {
}
}
}
func TestMaxSchemaAppender(t *testing.T) {
example := histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 33,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{3, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []int64{3, 0, 0},
}
cases := []struct {
h histogram.Histogram
maxSchema int32
expectSchema int32
}{
{
h: example,
maxSchema: -1,
expectSchema: -1,
},
{
h: example,
maxSchema: 0,
expectSchema: 0,
},
}
resApp := &collectResultAppender{}
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &maxSchemaAppender{Appender: resApp, maxSchema: c.maxSchema}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
require.Equal(t, c.expectSchema, fh.Schema)
require.NoError(t, err)
} else {
h := c.h.Copy()
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
require.Equal(t, c.expectSchema, h.Schema)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
})
}
}
}

Loading…
Cancel
Save