diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 2e9b032f3..5e6e7bea0 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -46,8 +46,6 @@ import ( "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/tsdb" - - _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. ) func main() { @@ -216,7 +214,7 @@ func main() { os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath))) case importRulesCmd.FullCommand(): - os.Exit(checkErr(ImportRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *importRulesFiles...))) + os.Exit(checkErr(importRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *importRulesFiles...))) } } @@ -792,9 +790,9 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { json.NewEncoder(os.Stdout).Encode(v) } -// ImportRules backfills recording rules from the files provided. The output are blocks of data +// importRules backfills recording rules from the files provided. The output are blocks of data // at the outputDir location. -func ImportRules(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { +func importRules(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { ctx := context.Background() var stime, etime time.Time var err error @@ -828,6 +826,9 @@ func ImportRules(url, start, end, outputDir string, evalInterval time.Duration, fmt.Fprintln(os.Stderr, "new writer error", err) return err } + defer func() { + err = writer.Close() + }() cfg := ruleImporterConfig{ Start: stime, @@ -841,6 +842,7 @@ func ImportRules(url, start, end, outputDir string, evalInterval time.Duration, fmt.Fprintln(os.Stderr, "new api client error", err) return err } + const maxSamplesInMemory = 5000 ruleImporter := newRuleImporter(logger, cfg, v1.NewAPI(c), newMultipleAppender(ctx, maxSamplesInMemory, writer)) errs := ruleImporter.loadGroups(ctx, files) @@ -858,5 +860,5 @@ func ImportRules(url, start, end, outputDir string, evalInterval time.Duration, } } - return nil + return err } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 359dcc0aa..f7a2b2bc5 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -16,7 +16,6 @@ package main import ( "context" "fmt" - "os" "time" "github.com/go-kit/kit/log" @@ -25,18 +24,16 @@ import ( v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" ) -const maxSamplesInMemory = 5000 - type queryRangeAPI interface { QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) } -// ruleImporter is the importer to backfill rules. type ruleImporter struct { logger log.Logger config ruleImporterConfig @@ -49,14 +46,14 @@ type ruleImporter struct { ruleManager *rules.Manager } -// ruleImporterConfig is the config for the rule importer. type ruleImporterConfig struct { Start time.Time End time.Time EvalInterval time.Duration } -// newRuleImporter creates a new rule importer that can be used to backfill rules. +// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series +// written to disk in blocks. func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI, appender *multipleAppender) *ruleImporter { return &ruleImporter{ logger: logger, @@ -67,7 +64,7 @@ func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient que } } -// loadGroups parses groups from a list of rule files. +// loadGroups parses groups from a list of recording rule files. func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { groups, errs := importer.ruleManager.LoadGroups(importer.config.EvalInterval, labels.Labels{}, filenames...) if errs != nil { @@ -77,78 +74,80 @@ func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string return nil } -// importAll evaluates all the groups and rules and creates new time series -// and stores them in new blocks. +// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks. func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { for name, group := range importer.groups { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name)) - stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) - - for stimeWithAlignment.Before(importer.config.End) { - currentBlockEnd := stimeWithAlignment.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) - if currentBlockEnd.After(importer.config.End) { - currentBlockEnd = importer.config.End - } - - for i, r := range group.Rules() { - level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) - if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, currentBlockEnd); err != nil { - errs = append(errs, err) - } + stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) + for i, r := range group.Rules() { + level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) + if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.End, group.Interval()); err != nil { + errs = append(errs, err) } - - stimeWithAlignment = currentBlockEnd } } + if err := importer.appender.flushAndCommit(ctx); err != nil { + errs = append(errs, err) + } return errs } -// importRule imports the historical data for a single rule. -func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time) error { - val, warnings, err := importer.apiClient.QueryRange(ctx, - ruleExpr, - v1.Range{ - Start: start, - End: end, - Step: importer.config.EvalInterval, // todo: did we check if the rule has an interval? - }, - ) - if err != nil { - return err - } - if warnings != nil { - fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings) - } +// importRule queries a prometheus API to evaluate rules at times in the past. +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) error { - var matrix model.Matrix - switch val.Type() { - case model.ValMatrix: - matrix = val.(model.Matrix) - for _, sample := range matrix { - - currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1) - currentLabels = append(currentLabels, labels.Label{ - Name: labels.MetricName, - Value: ruleName, - }) - for _, ruleLabel := range ruleLabels { - currentLabels = append(currentLabels, ruleLabel) - } - for k, v := range sample.Metric { + // This loop breaks up the calls to the QueryRange API into 2 hr chunks so that we + // don't ever request too much data or take to long to process to avoid timeout. + for start.Before(end) { + currentBlockEnd := start.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) + if currentBlockEnd.After(end) { + currentBlockEnd = end + } + + val, warnings, err := importer.apiClient.QueryRange(ctx, + ruleExpr, + v1.Range{ + Start: start, + End: end, + Step: interval, + }, + ) + if err != nil { + return err + } + if warnings != nil { + level.Warn(importer.logger).Log("backfiller", fmt.Sprintf("warnings QueryRange api: %v", warnings)) + } + + var matrix model.Matrix + switch val.Type() { + case model.ValMatrix: + matrix = val.(model.Matrix) + for _, sample := range matrix { + currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1) currentLabels = append(currentLabels, labels.Label{ - Name: string(k), - Value: string(v), + Name: labels.MetricName, + Value: ruleName, }) - } - for _, value := range sample.Values { - if err := importer.appender.add(ctx, currentLabels, value.Timestamp.Unix(), float64(value.Value)); err != nil { - return err + for _, ruleLabel := range ruleLabels { + currentLabels = append(currentLabels, ruleLabel) + } + for name, value := range sample.Metric { + currentLabels = append(currentLabels, labels.Label{ + Name: string(name), + Value: string(value), + }) + } + for _, value := range sample.Values { + if err := importer.appender.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil { + return err + } } } + default: + return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String())) } - default: - return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String())) + start = currentBlockEnd } return nil } @@ -176,13 +175,16 @@ func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v return err } m.currentSampleCount++ - if m.currentSampleCount > m.maxSamplesInMemory { + if m.currentSampleCount >= m.maxSamplesInMemory { return m.flushAndCommit(ctx) } return nil } func (m *multipleAppender) flushAndCommit(ctx context.Context) error { + if m.currentSampleCount == 0 { + return nil + } if _, err := m.writer.Flush(ctx); err != nil { return err } @@ -193,7 +195,3 @@ func (m *multipleAppender) flushAndCommit(ctx context.Context) error { m.currentSampleCount = 0 return nil } - -func (m *multipleAppender) close() error { - return m.writer.Close() -} diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go new file mode 100644 index 000000000..ac667ed84 --- /dev/null +++ b/cmd/promtool/rules_test.go @@ -0,0 +1,171 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "io/ioutil" + "math" + "os" + "testing" + "time" + + "github.com/go-kit/kit/log" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" +) + +const testMaxSampleCount = 500 + +type mockQueryRangeAPI struct{} + +func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + var testMatrix model.Matrix = []*model.SampleStream{ + { + Metric: model.Metric{ + "labelname1": "val1", + }, + Values: []model.SamplePair{ + { + Timestamp: model.Time(123456789123), + Value: 123, + }, + }, + }, + } + return testMatrix, v1.Warnings{}, nil +} + +// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together. +func TestBackfillRuleIntegration(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "backfilldata") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tmpDir)) + }() + start := time.Now().UTC() + ctx := context.Background() + + const ( + groupName = "test_group_name" + ruleName1 = "test_rule1_name" + ruleExpr = "test_expr" + ruleLabels = "test_label_name: test_label_value" + ) + + // Execute test two times in a row to simulate running the rule importer twice with the same data. + // We expect that duplicate blocks with the same series are created when run more than once. + for i := 0; i < 2; i++ { + ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir) + require.NoError(t, err) + path := tmpDir + "/test.file" + require.NoError(t, createTestFiles(groupName, ruleName1, ruleExpr, ruleLabels, path)) + + // After loading/parsing the recording rule files make sure the parsing was correct. + errs := ruleImporter.loadGroups(ctx, []string{path}) + for _, err := range errs { + require.NoError(t, err) + } + const groupCount = 1 + require.Equal(t, groupCount, len(ruleImporter.groups)) + groupNameWithPath := path + ";" + groupName + group1 := ruleImporter.groups[groupNameWithPath] + require.NotNil(t, group1) + const defaultInterval = 60 + require.Equal(t, time.Duration(defaultInterval*time.Second), group1.Interval()) + gRules := group1.Rules() + const ruleCount = 1 + require.Equal(t, ruleCount, len(gRules)) + require.Equal(t, ruleName1, gRules[0].Name()) + require.Equal(t, ruleExpr, gRules[0].Query().String()) + require.Equal(t, 1, len(gRules[0].Labels())) + + // Backfill all recording rules then check the blocks to confirm the right + // data was created. + errs = ruleImporter.importAll(ctx) + for _, err := range errs { + require.NoError(t, err) + } + + opts := tsdb.DefaultOptions() + opts.AllowOverlappingBlocks = true + db, err := tsdb.Open(tmpDir, nil, nil, opts) + require.NoError(t, err) + + blocks := db.Blocks() + require.Equal(t, i+1, len(blocks)) + + q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + var seriesCount, samplesCount int + for selectedSeries.Next() { + seriesCount++ + series := selectedSeries.At() + require.Equal(t, 3, len(series.Labels())) + it := series.Iterator() + for it.Next() { + samplesCount++ + _, v := it.At() + require.Equal(t, 123, v) + } + require.NoError(t, it.Err()) + } + require.NoError(t, selectedSeries.Err()) + require.Equal(t, 1, seriesCount) + require.Equal(t, 2, samplesCount) + require.NoError(t, q.Close()) + require.NoError(t, db.Close()) + } + +} + +func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string) (*ruleImporter, error) { + logger := log.NewNopLogger() + cfg := ruleImporterConfig{ + Start: start.Add(-1 * time.Hour), + End: start, + EvalInterval: 60 * time.Second, + } + writer, err := tsdb.NewBlockWriter(logger, + tmpDir, + tsdb.DefaultBlockDuration, + ) + if err != nil { + return nil, err + } + + app := newMultipleAppender(ctx, testMaxSampleCount, writer) + return newRuleImporter(logger, cfg, mockQueryRangeAPI{}, app), nil +} + +func createTestFiles(groupName, ruleName, ruleExpr, ruleLabels, path string) error { + x := fmt.Sprintf(` +groups: +- name: %s + rules: + - record: %s + expr: %s + labels: + %s +`, + groupName, ruleName, ruleExpr, ruleLabels, + ) + return ioutil.WriteFile(path, []byte(x), 0777) +}