diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 476c1c599..4533e6ebb 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -20,12 +20,14 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" ) @@ -54,6 +56,7 @@ type ruleImporterConfig struct { // newRuleImporter creates a new rule importer that can be used to backfill rules. func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter { return &ruleImporter{ + logger: logger, config: config, groupLoader: rules.FileLoader{}, } @@ -134,77 +137,92 @@ func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string // and stores them in new blocks. func (importer *ruleImporter) importAll(ctx context.Context) []error { var errs = []error{} - for _, group := range importer.groups { + var currentBlockEnd time.Time + var appender storage.Appender + + for name, group := range importer.groups { + level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name)) stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) - for _, r := range group.Rules() { - err := importer.importRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval()) + ts := stimeWithAlignment + // a 2-hr block that contains all the data for each rule + for ts.Before(importer.config.End) { + currentBlockEnd = ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) + if currentBlockEnd.After(importer.config.End) { + currentBlockEnd = importer.config.End + } + // should we be creating a new appender for each block? + appender = importer.writer.Appender(ctx) + + for i, r := range group.Rules() { + level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) + err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), ts, currentBlockEnd, appender) + if err != nil { + errs = append(errs, err) + } + } + + ts = currentBlockEnd + _, err := importer.writer.Flush(ctx) + if err != nil { + errs = append(errs, err) + } + + err = appender.Commit() if err != nil { errs = append(errs, err) } } } - _, err := importer.writer.Flush(ctx) - if err != nil { - errs = append(errs, err) - } return errs } // importRule imports the historical data for a single rule. -func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error { - ts := stimeWithAlignment - appender := importer.writer.Appender(ctx) - - for ts.Before(importer.config.End) { - currentBlockEnd := ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) - if currentBlockEnd.After(importer.config.End) { - currentBlockEnd = importer.config.End - } +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, appender storage.Appender) error { + val, warnings, err := importer.apiClient.QueryRange(ctx, + ruleExpr, + v1.Range{ + Start: start, + End: end, + Step: importer.config.EvalInterval, + }, + ) + if err != nil { + return err + } + if warnings != nil { + fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings) + } - val, warnings, err := importer.apiClient.QueryRange(ctx, - ruleExpr, - v1.Range{ - Start: ts, - End: currentBlockEnd, - Step: importer.config.EvalInterval, - }, - ) - if err != nil { - return err - } - if warnings != nil { - fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings) - } + var matrix model.Matrix + switch val.Type() { + case model.ValMatrix: + matrix = val.(model.Matrix) + for _, sample := range matrix { - var matrix model.Matrix - switch val.Type() { - case model.ValMatrix: - matrix = val.(model.Matrix) - for _, sample := range matrix { - currentLabels := make(labels.Labels, 0, len(sample.Metric)) - for k, v := range sample.Metric { - currentLabels = append(currentLabels, labels.Label{ - Name: string(k), - Value: string(v), - }) - } - for _, value := range sample.Values { - _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) - if err != nil { - return err - } + currentLabels := make(labels.Labels, 0, len(sample.Metric)) + currentLabels = append(currentLabels, labels.Label{ + Name: labels.MetricName, + Value: ruleName, + }) + for _, ruleLabel := range ruleLabels { + currentLabels = append(currentLabels, ruleLabel) + } + for k, v := range sample.Metric { + currentLabels = append(currentLabels, labels.Label{ + Name: string(k), + Value: string(v), + }) + } + for _, value := range sample.Values { + _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) + if err != nil { + return err } } - default: - return errors.New("rule result is wrong type") } - - ts = currentBlockEnd + default: + return errors.New("rule result is wrong type") } - _, err := importer.writer.Flush(ctx) - if err != nil { - return err - } - return appender.Commit() + return nil } diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index b859f86d9..e1faf0196 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -84,10 +84,6 @@ func (w *BlockWriter) Appender(ctx context.Context) storage.Appender { // Flush implements the Writer interface. This is where actual block writing // happens. After flush completes, no writes can be done. func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { - if w.head.NumSeries() == 0 { - return ulid.ULID{}, errors.New("no series appended, aborting") - } - mint := w.head.MinTime() // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Because of this block intervals are always +1 than the total samples it includes.