diff --git a/promql/engine.go b/promql/engine.go index 599486cae..836710181 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix { return mat } -// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise. -func (ev *evaluator) evalMatrixBounds(e Expr) matrix { - ms, ok := e.(*MatrixSelector) - if !ok { - ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e) - } - return ev.matrixSelectorBounds(ms) -} - // evalString attempts to evaluate e to a string value and errors otherwise. func (ev *evaluator) evalString(e Expr) *model.String { val := ev.eval(e) @@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix { return matrix(sampleStreams) } -// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector. -func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix { - interval := metric.Interval{ - OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset), - NewestInclusive: ev.Timestamp.Add(-node.Offset), - } - - sampleStreams := make([]*sampleStream, 0, len(node.iterators)) - for fp, it := range node.iterators { - samplePairs := it.BoundaryValues(interval) - if len(samplePairs) == 0 { - continue - } - - ss := &sampleStream{ - Metric: node.metrics[fp], - Values: samplePairs, - } - sampleStreams = append(sampleStreams, ss) - } - return matrix(sampleStreams) -} - func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector { if matching.Card != CardManyToMany { panic("logical operations must always be many-to-many matching") diff --git a/promql/functions.go b/promql/functions.go index 4728b1e96..0d3ec4195 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value { return vector } +// linearRegression performs a least-square linear regression analysis on the +// provided SamplePairs. It returns the slope, and the intercept value at the +// provided time. +func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) { + var ( + n model.SampleValue + sumX, sumY model.SampleValue + sumXY, sumX2 model.SampleValue + ) + for _, sample := range samples { + x := model.SampleValue( + model.Time(sample.Timestamp-interceptTime).UnixNano(), + ) / 1e9 + n += 1.0 + sumY += sample.Value + sumX += x + sumXY += x * sample.Value + sumX2 += x * x + } + covXY := sumXY - sumX*sumY/n + varX := sumX2 - sumX*sumX/n + + slope = covXY / varX + intercept = sumY/n - slope*sumX/n + return +} + // === deriv(node model.ValMatrix) Vector === func funcDeriv(ev *evaluator, args Expressions) model.Value { - resultVector := vector{} mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) for _, samples := range mat { // No sense in trying to compute a derivative without at least two points. @@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { if len(samples.Values) < 2 { continue } - - // Least squares. - var ( - n model.SampleValue - sumX, sumY model.SampleValue - sumXY, sumX2 model.SampleValue - ) - for _, sample := range samples.Values { - x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9) - n += 1.0 - sumY += sample.Value - sumX += x - sumXY += x * sample.Value - sumX2 += x * x - } - numerator := sumXY - sumX*sumY/n - denominator := sumX2 - (sumX*sumX)/n - - resultValue := numerator / denominator - + slope, _ := linearRegression(samples.Values, 0) resultSample := &sample{ Metric: samples.Metric, - Value: resultValue, + Value: slope, Timestamp: ev.Timestamp, } resultSample.Metric.Del(model.MetricNameLabel) @@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { // === predict_linear(node model.ValMatrix, k model.ValScalar) Vector === func funcPredictLinear(ev *evaluator, args Expressions) model.Value { - vec := funcDeriv(ev, args[0:1]).(vector) - duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1]))) + mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) + duration := model.SampleValue(ev.evalFloat(args[1])) - excludedLabels := map[model.LabelName]struct{}{ - model.MetricNameLabel: {}, - } - - // Calculate predicted delta over the duration. - signatureToDelta := map[uint64]model.SampleValue{} - for _, el := range vec { - signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels) - signatureToDelta[signature] = el.Value * duration - } - - // add predicted delta to last value. - // TODO(beorn7): This is arguably suboptimal. The funcDeriv above has - // given us an estimate over the range. So we should add the delta to - // the value predicted for the end of the range. Also, once this has - // been rectified, we are not using BoundaryValues anywhere anymore, so - // we can kick out a whole lot of code. - matrixBounds := ev.evalMatrixBounds(args[0]) - outVec := make(vector, 0, len(signatureToDelta)) - for _, samples := range matrixBounds { + for _, samples := range mat { + // No sense in trying to predict anything without at least two points. + // Drop this vector element. if len(samples.Values) < 2 { continue } - signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels) - delta, ok := signatureToDelta[signature] - if ok { - samples.Metric.Del(model.MetricNameLabel) - outVec = append(outVec, &sample{ - Metric: samples.Metric, - Value: delta + samples.Values[1].Value, - Timestamp: ev.Timestamp, - }) + slope, intercept := linearRegression(samples.Values, ev.Timestamp) + resultSample := &sample{ + Metric: samples.Metric, + Value: slope*duration + intercept, + Timestamp: ev.Timestamp, } + resultSample.Metric.Del(model.MetricNameLabel) + resultVector = append(resultVector, resultSample) } - return outVec + return resultVector } // === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector === diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index bccec5a1a..28233033e 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m]) {} 0.010606060606060607 # predict_linear should return correct result. +# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000] +# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50] +# sumX = 16500 +# sumY = 250 +# sumXY = 480000 +# sumX2 = 34650000 +# n = 11 +# covXY = 105000 +# varX = 9900000 +# slope = 0.010606060606060607 +# intercept at t=0: 6.818181818181818 +# intercept at t=3000: 38.63636363636364 +# intercept at t=3000+3600: 76.81818181818181 eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - {} 88.181818181818185200 + {} 76.81818181818181 -# predict_linear is syntactic sugar around deriv. +# With http_requests, there is a sample value exactly at the end of +# the range, and it has exactly the predicted value, so predict_linear +# can be emulated with deriv. eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600) {group="canary", instance="1", job="app-server"} 0 -eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600) - {} 0 - clear # Tests for label_replace. diff --git a/storage/local/interface.go b/storage/local/interface.go index 631525b4a..5d910db09 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -79,9 +79,6 @@ type SeriesIterator interface { // exist at precisely the given time, that value is returned. If no // applicable value exists, ZeroSamplePair is returned. ValueAtOrBeforeTime(model.Time) model.SamplePair - // Gets the boundary values of an interval: the first and last value - // within a given interval. - BoundaryValues(metric.Interval) []model.SamplePair // Gets all values contained within a given interval. RangeValues(metric.Interval) []model.SamplePair } diff --git a/storage/local/series.go b/storage/local/series.go index e6a37ad4a..7a027a8a5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -540,71 +540,6 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return it.chunkIt.valueAtOrBeforeTime(t) } -// BoundaryValues implements SeriesIterator. -func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - // Find the first chunk for which the first sample is within the interval. - i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].firstTime().Before(in.OldestInclusive) - }) - // Only now check the last timestamp of the previous chunk (which is - // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- - } - - values := make([]model.SamplePair, 0, 2) - for j, c := range it.chunks[i:] { - if c.firstTime().After(in.NewestInclusive) { - if len(values) == 1 { - // We found the first value before but are now - // already past the last value. The value we - // want must be the last value of the previous - // chunk. So backtrack... - chunkIt := it.chunkIterator(i + j - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - break - } - chunkIt := it.chunkIterator(i + j) - if len(values) == 0 { - for s := range chunkIt.values() { - if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) { - values = append(values, *s) - // We cannot just break out here as we have to consume all - // the values to not leak a goroutine. This could obviously - // be made much neater with more suitable methods in the chunk - // interface. But currently, BoundaryValues is only used by - // `predict_linear` so we would pollute the chunk interface - // unduly just for one single corner case. Plus, even that use - // of BoundaryValues is suboptimal and should be replaced. - } - } - } - if chunkIt.lastTimestamp().After(in.NewestInclusive) { - s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive) - if s.Timestamp != model.Earliest { - values = append(values, s) - } - break - } - } - if len(values) == 1 { - // We found exactly one value. In that case, add the most recent we know. - chunkIt := it.chunkIterator(len(it.chunks) - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - if len(values) == 2 && values[0].Equal(&values[1]) { - return values[:1] - } - return values -} - // RangeValues implements SeriesIterator. func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { // Find the first chunk for which the first sample is within the interval. @@ -653,11 +588,6 @@ func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.Sa return it.samplePair } -// BoundaryValues implements SeriesIterator. -func (it *singleSampleSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - return it.RangeValues(in) -} - // RangeValues implements SeriesIterator. func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { if it.samplePair.Timestamp.After(in.NewestInclusive) || @@ -675,11 +605,6 @@ func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { return ZeroSamplePair } -// BoundaryValues implements SeriesIterator. -func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - return []model.SamplePair{} -} - // RangeValues implements SeriesIterator. func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { return []model.SamplePair{} diff --git a/storage/local/storage.go b/storage/local/storage.go index f90dfae8a..f87f083ba 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -374,17 +374,6 @@ func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair return bit.it.ValueAtOrBeforeTime(ts) } -// BoundaryValues implements the SeriesIterator interface. -func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair { - if interval.NewestInclusive < bit.start { - return []model.SamplePair{} - } - if interval.OldestInclusive < bit.start { - interval.OldestInclusive = bit.start - } - return bit.it.BoundaryValues(interval) -} - // RangeValues implements the SeriesIterator interface. func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair { if interval.NewestInclusive < bit.start { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8fcdcecfe..381f7c7a3 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -424,14 +424,6 @@ func TestRetentionCutoff(t *testing.T) { if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) } - - vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) - if len(vals) != 2 { - t.Errorf("expected 2 values but got %d", len(vals)) - } - if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { - t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) - } } func TestDropMetrics(t *testing.T) { @@ -1036,18 +1028,18 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatalf("Error preloading everything: %s", err) } - actual := it.BoundaryValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) - if len(actual) != 2 { - t.Fatal("expected two results after purging half of series") + if len(actual) < 4000 { + t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual)) } if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 { t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) } want := model.Time(19998) - if actual[1].Timestamp != want { + if actual[len(actual)-1].Timestamp != want { t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) } @@ -1057,7 +1049,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatalf("Error preloading everything: %s", err) } - actual = it.BoundaryValues(metric.Interval{ + actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index e739142b4..6d92361a0 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -29,7 +29,6 @@ const ( ResultAppendTime QueryAnalysisTime GetValueAtTimeTime - GetBoundaryValuesTime GetRangeValuesTime ExecQueueTime ViewDiskPreparationTime @@ -60,8 +59,6 @@ func (s QueryTiming) String() string { return "Query analysis time" case GetValueAtTimeTime: return "GetValueAtTime() time" - case GetBoundaryValuesTime: - return "GetBoundaryValues() time" case GetRangeValuesTime: return "GetRangeValues() time" case ExecQueueTime: