diff --git a/rules/manager.go b/rules/manager.go index 993302637..b3ed28f8f 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -163,7 +163,7 @@ func (g *Group) run() { iterationsScheduled.Inc() start := time.Now() - g.Eval() + g.Eval(start) iterationDuration.Observe(time.Since(start).Seconds()) } @@ -257,9 +257,8 @@ func typeForRule(r Rule) ruleType { // Eval runs a single evaluation cycle in which all rules are evaluated in parallel. // In the future a single group will be evaluated sequentially to properly handle // rule dependency. -func (g *Group) Eval() { +func (g *Group) Eval(ts time.Time) { var ( - now = time.Now() wg sync.WaitGroup mu sync.Mutex seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval)) @@ -279,7 +278,7 @@ func (g *Group) Eval() { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.Eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL) + vector, err := rule.Eval(g.opts.Context, ts, g.opts.QueryEngine, g.opts.ExternalURL) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -344,7 +343,7 @@ func (g *Group) Eval() { for metric, lset := range g.seriesInPreviousEval { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. - _, err = app.Add(lset, timestamp.FromTime(now), math.Float64frombits(value.StaleNaN)) + _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) switch err { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: diff --git a/rules/manager_test.go b/rules/manager_test.go index 9493996a9..eab4139ee 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -14,7 +14,10 @@ package rules import ( + "context" "fmt" + "math" + "reflect" "strings" "testing" "time" @@ -23,7 +26,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" ) func TestAlertingRule(t *testing.T) { @@ -156,3 +162,96 @@ func annotateWithTime(lines []string, ts time.Time) []string { } return annotatedLines } + +func TestStaleness(t *testing.T) { + storage := testutil.NewStorage(t) + defer storage.Close() + engine := promql.NewEngine(storage, nil) + opts := &ManagerOptions{ + QueryEngine: engine, + Appendable: storage, + Context: context.Background(), + } + + expr, err := promql.ParseExpr("a + 1") + if err != nil { + t.Fatal(err) + } + rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) + group := NewGroup("default", time.Second, []Rule{rule}, opts) + + // A time series that has two samples and then goes stale. + app, _ := storage.Appender() + app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) + if err = app.Commit(); err != nil { + t.Fatal(err) + } + app, _ = storage.Appender() + app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) + if err = app.Commit(); err != nil { + t.Fatal(err) + } + app, _ = storage.Appender() + app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) + if err = app.Commit(); err != nil { + t.Fatal(err) + } + + // Execute 3 times, 1 second apart. + group.Eval(time.Unix(0, 0)) + group.Eval(time.Unix(1, 0)) + group.Eval(time.Unix(2, 0)) + + querier, err := storage.Querier(0, 2000) + defer querier.Close() + if err != nil { + t.Fatal(err) + } + matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") + seriesSet := querier.Select(matcher) + samples, err := readSeriesSet(seriesSet) + if err != nil { + t.Fatal(err) + } + + metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() + metricSample, ok := samples[metric] + if !ok { + t.Fatalf("Series %s not returned.", metric) + } + if !value.IsStaleNaN(metricSample[2].V) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V)) + } + metricSample[2].V = 42 // reflect.DeepEqual cannot handle NaN. + + want := map[string][]promql.Point{ + metric: []promql.Point{{0, 2}, {1000, 3}, {2000, 42}}, + } + + if !reflect.DeepEqual(want, samples) { + t.Fatalf("Returned samples not as expected. Wanted: %+v Got: %+v", want, samples) + } +} + +// Convert a SeriesSet into a form useable with reflect.DeepEqual. +func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) { + result := map[string][]promql.Point{} + + for ss.Next() { + series := ss.At() + + points := []promql.Point{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + points = append(points, promql.Point{T: t, V: v}) + } + + name := series.Labels().String() + result[name] = points + if err := ss.Err(); err != nil { + return nil, err + } + } + return result, nil +}