mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
249 lines
7.9 KiB
249 lines
7.9 KiB
// Copyright 2020 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package main |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"time" |
|
|
|
"github.com/go-kit/log" |
|
"github.com/go-kit/log/level" |
|
"github.com/pkg/errors" |
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1" |
|
"github.com/prometheus/common/model" |
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/model/timestamp" |
|
"github.com/prometheus/prometheus/rules" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/tsdb" |
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" |
|
) |
|
|
|
const maxSamplesInMemory = 5000 |
|
|
|
type queryRangeAPI interface { |
|
QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) |
|
} |
|
|
|
type ruleImporter struct { |
|
logger log.Logger |
|
config ruleImporterConfig |
|
|
|
apiClient queryRangeAPI |
|
|
|
groups map[string]*rules.Group |
|
ruleManager *rules.Manager |
|
} |
|
|
|
type ruleImporterConfig struct { |
|
outputDir string |
|
start time.Time |
|
end time.Time |
|
evalInterval time.Duration |
|
maxBlockDuration time.Duration |
|
} |
|
|
|
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series |
|
// written to disk in blocks. |
|
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter { |
|
level.Info(logger).Log("backfiller", "new rule importer", "start", config.start.Format(time.RFC822), "end", config.end.Format(time.RFC822)) |
|
return &ruleImporter{ |
|
logger: logger, |
|
config: config, |
|
apiClient: apiClient, |
|
ruleManager: rules.NewManager(&rules.ManagerOptions{}), |
|
} |
|
} |
|
|
|
// loadGroups parses groups from a list of recording rule files. |
|
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { |
|
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", filenames...) |
|
if errs != nil { |
|
return errs |
|
} |
|
importer.groups = groups |
|
return nil |
|
} |
|
|
|
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks. |
|
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { |
|
for name, group := range importer.groups { |
|
level.Info(importer.logger).Log("backfiller", "processing group", "name", name) |
|
|
|
for i, r := range group.Rules() { |
|
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name()) |
|
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil { |
|
errs = append(errs, err) |
|
} |
|
} |
|
} |
|
return errs |
|
} |
|
|
|
// importRule queries a prometheus API to evaluate rules at times in the past. |
|
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, |
|
maxBlockDuration int64, grp *rules.Group) (err error) { |
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration) |
|
startInMs := start.Unix() * int64(time.Second/time.Millisecond) |
|
endInMs := end.Unix() * int64(time.Second/time.Millisecond) |
|
|
|
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock = startOfBlock + blockDuration { |
|
endOfBlock := startOfBlock + blockDuration - 1 |
|
|
|
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix()) |
|
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano()) |
|
for startWithAlignment.Unix() < currStart { |
|
startWithAlignment = startWithAlignment.Add(grp.Interval()) |
|
} |
|
end := time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC() |
|
if end.Before(startWithAlignment) { |
|
break |
|
} |
|
val, warnings, err := importer.apiClient.QueryRange(ctx, |
|
ruleExpr, |
|
v1.Range{ |
|
Start: startWithAlignment, |
|
End: end, |
|
Step: grp.Interval(), |
|
}, |
|
) |
|
if err != nil { |
|
return errors.Wrap(err, "query range") |
|
} |
|
if warnings != nil { |
|
level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings) |
|
} |
|
|
|
// To prevent races with compaction, a block writer only allows appending samples |
|
// that are at most half a block size older than the most recent sample appended so far. |
|
// However, in the way we use the block writer here, compaction doesn't happen, while we |
|
// also need to append samples throughout the whole block range. To allow that, we |
|
// pretend that the block is twice as large here, but only really add sample in the |
|
// original interval later. |
|
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*blockDuration) |
|
if err != nil { |
|
return errors.Wrap(err, "new block writer") |
|
} |
|
var closed bool |
|
defer func() { |
|
if !closed { |
|
err = tsdb_errors.NewMulti(err, w.Close()).Err() |
|
} |
|
}() |
|
app := newMultipleAppender(ctx, w) |
|
var matrix model.Matrix |
|
switch val.Type() { |
|
case model.ValMatrix: |
|
matrix = val.(model.Matrix) |
|
|
|
for _, sample := range matrix { |
|
lb := labels.NewBuilder(labels.Labels{}) |
|
|
|
for name, value := range sample.Metric { |
|
lb.Set(string(name), string(value)) |
|
} |
|
|
|
// Setting the rule labels after the output of the query, |
|
// so they can override query output. |
|
for _, l := range ruleLabels { |
|
lb.Set(l.Name, l.Value) |
|
} |
|
|
|
lb.Set(labels.MetricName, ruleName) |
|
|
|
for _, value := range sample.Values { |
|
if err := app.add(ctx, lb.Labels(), timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil { |
|
return errors.Wrap(err, "add") |
|
} |
|
} |
|
} |
|
default: |
|
return fmt.Errorf("rule result is wrong type %s", val.Type().String()) |
|
} |
|
|
|
if err := app.flushAndCommit(ctx); err != nil { |
|
return errors.Wrap(err, "flush and commit") |
|
} |
|
err = tsdb_errors.NewMulti(err, w.Close()).Err() |
|
closed = true |
|
} |
|
|
|
return err |
|
} |
|
|
|
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender { |
|
return &multipleAppender{ |
|
maxSamplesInMemory: maxSamplesInMemory, |
|
writer: blockWriter, |
|
appender: blockWriter.Appender(ctx), |
|
} |
|
} |
|
|
|
// multipleAppender keeps track of how many series have been added to the current appender. |
|
// If the max samples have been added, then all series are committed and a new appender is created. |
|
type multipleAppender struct { |
|
maxSamplesInMemory int |
|
currentSampleCount int |
|
writer *tsdb.BlockWriter |
|
appender storage.Appender |
|
} |
|
|
|
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error { |
|
if _, err := m.appender.Append(0, l, t, v); err != nil { |
|
return errors.Wrap(err, "multiappender append") |
|
} |
|
m.currentSampleCount++ |
|
if m.currentSampleCount >= m.maxSamplesInMemory { |
|
return m.commit(ctx) |
|
} |
|
return nil |
|
} |
|
|
|
func (m *multipleAppender) commit(ctx context.Context) error { |
|
if m.currentSampleCount == 0 { |
|
return nil |
|
} |
|
if err := m.appender.Commit(); err != nil { |
|
return errors.Wrap(err, "multiappender commit") |
|
} |
|
m.appender = m.writer.Appender(ctx) |
|
m.currentSampleCount = 0 |
|
return nil |
|
} |
|
|
|
func (m *multipleAppender) flushAndCommit(ctx context.Context) error { |
|
if err := m.commit(ctx); err != nil { |
|
return err |
|
} |
|
if _, err := m.writer.Flush(ctx); err != nil { |
|
return errors.Wrap(err, "multiappender flush") |
|
} |
|
return nil |
|
} |
|
|
|
func max(x, y int64) int64 { |
|
if x > y { |
|
return x |
|
} |
|
return y |
|
} |
|
|
|
func min(x, y int64) int64 { |
|
if x < y { |
|
return x |
|
} |
|
return y |
|
}
|
|
|