Browse Source

Proposal to improve FPointSlice and HPointSlice allocation. (#13448)

* Reusing points slice from previous series when the slice is under utilized
* Adding comments on the bench test

Signed-off-by: Alan Protasio <alanprot@gmail.com>
njpm/rw2-sync-main-conflicts^2
Alan Protasio 10 months ago committed by GitHub
parent
commit
c006c57efc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 19
      promql/bench_test.go
  2. 38
      promql/engine.go

19
promql/bench_test.go

@ -33,6 +33,10 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval,
ctx := context.Background()
metrics := []labels.Labels{}
// Generating test series: a_X, b_X, and h_X, where X can take values of one, ten, or hundred,
// representing the number of series each metric name contains.
// Metric a_X and b_X are simple metrics where h_X is a histogram.
// These metrics will have data for all test time range
metrics = append(metrics, labels.FromStrings("__name__", "a_one"))
metrics = append(metrics, labels.FromStrings("__name__", "b_one"))
for j := 0; j < 10; j++ {
@ -59,6 +63,9 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval,
}
refs := make([]storage.SeriesRef, len(metrics))
// Number points for each different label value of "l" for the sparse series
pointsPerSparseSeries := numIntervals / 50
for s := 0; s < numIntervals; s++ {
a := stor.Appender(context.Background())
ts := int64(s * interval)
@ -66,10 +73,18 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval,
ref, _ := a.Append(refs[i], metric, ts, float64(s)+float64(i)/float64(len(metrics)))
refs[i] = ref
}
// Generating a sparse time series: each label value of "l" will contain data only for
// pointsPerSparseSeries points
metric := labels.FromStrings("__name__", "sparse", "l", strconv.Itoa(s/pointsPerSparseSeries))
_, err := a.Append(0, metric, ts, float64(s)/float64(len(metrics)))
if err != nil {
return err
}
if err := a.Commit(); err != nil {
return err
}
}
stor.DB.ForceHeadMMap() // Ensure we have at most one head chunk for every series.
stor.DB.Compact(ctx)
return nil
@ -94,6 +109,10 @@ func rangeQueryCases() []benchCase {
expr: "rate(a_X[1m])",
steps: 10000,
},
{
expr: "rate(sparse[1m])",
steps: 10000,
},
// Holt-Winters and long ranges.
{
expr: "holt_winters(a_X[1d], 0.3, 0.3)",

38
promql/engine.go

@ -1452,6 +1452,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// Reuse objects across steps to save memory allocations.
var floats []FPoint
var histograms []HPoint
var prevSS *Series
inMatrix := make(Matrix, 1)
inArgs[matrixArgIndex] = inMatrix
enh := &EvalNodeHelper{Out: make(Vector, 0, 1)}
@ -1512,12 +1513,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
if len(outVec) > 0 {
if outVec[0].H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps)
}
ss.Floats = append(ss.Floats, FPoint{F: outVec[0].F, T: ts})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{H: outVec[0].H, T: ts})
}
@ -1526,9 +1528,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
it.ReduceDelta(stepRange)
}
histSamples := totalHPointSize(ss.Histograms)
if len(ss.Floats)+histSamples > 0 {
if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples {
mat = append(mat, ss)
prevSS = &mat[len(mat)-1]
ev.currentSamples += len(ss.Floats) + histSamples
} else {
ev.error(ErrTooManySamples(env))
@ -1678,6 +1682,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
mat := make(Matrix, 0, len(e.Series))
var prevSS *Series
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range e.Series {
@ -1697,14 +1702,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
if ev.currentSamples < ev.maxSamples {
if h == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps)
}
ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
point := HPoint{H: h, T: ts}
ss.Histograms = append(ss.Histograms, point)
@ -1720,6 +1725,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
if len(ss.Floats)+len(ss.Histograms) > 0 {
mat = append(mat, ss)
prevSS = &mat[len(mat)-1]
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
@ -1840,6 +1846,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
panic(fmt.Errorf("unhandled expression of type: %T", expr))
}
// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room.
// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one.
func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) {
if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 {
r = prevSS.Histograms[len(prevSS.Histograms):]
prevSS.Histograms = prevSS.Histograms[0:len(prevSS.Histograms):len(prevSS.Histograms)]
return
}
return getHPointSlice(numSteps)
}
// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room.
// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one.
func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) {
if prevSS != nil && cap(prevSS.Floats)-2*len(prevSS.Floats) > 0 {
r = prevSS.Floats[len(prevSS.Floats):]
prevSS.Floats = prevSS.Floats[0:len(prevSS.Floats):len(prevSS.Floats)]
return
}
return getFPointSlice(numSteps)
}
func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) {
ws, err := checkAndExpandSeriesSet(ev.ctx, vs)
if err != nil {

Loading…
Cancel
Save