add name and labels to metric, eval all rules for each block

Signed-off-by: jessicagreben <jessicagrebens@gmail.com>
pull/7675/head
jessicagreben 4 years ago
parent 75654715d3
commit 19dee0a569

@ -20,12 +20,14 @@ import (
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1" v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb"
) )
@ -54,6 +56,7 @@ type ruleImporterConfig struct {
// newRuleImporter creates a new rule importer that can be used to backfill rules. // newRuleImporter creates a new rule importer that can be used to backfill rules.
func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter { func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter {
return &ruleImporter{ return &ruleImporter{
logger: logger,
config: config, config: config,
groupLoader: rules.FileLoader{}, groupLoader: rules.FileLoader{},
} }
@ -134,77 +137,92 @@ func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string
// and stores them in new blocks. // and stores them in new blocks.
func (importer *ruleImporter) importAll(ctx context.Context) []error { func (importer *ruleImporter) importAll(ctx context.Context) []error {
var errs = []error{} var errs = []error{}
for _, group := range importer.groups { var currentBlockEnd time.Time
var appender storage.Appender
for name, group := range importer.groups {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name))
stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano())
for _, r := range group.Rules() { ts := stimeWithAlignment
err := importer.importRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval()) // a 2-hr block that contains all the data for each rule
for ts.Before(importer.config.End) {
currentBlockEnd = ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond)
if currentBlockEnd.After(importer.config.End) {
currentBlockEnd = importer.config.End
}
// should we be creating a new appender for each block?
appender = importer.writer.Appender(ctx)
for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name()))
err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), ts, currentBlockEnd, appender)
if err != nil {
errs = append(errs, err)
}
}
ts = currentBlockEnd
_, err := importer.writer.Flush(ctx)
if err != nil {
errs = append(errs, err)
}
err = appender.Commit()
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
} }
_, err := importer.writer.Flush(ctx)
if err != nil {
errs = append(errs, err)
}
return errs return errs
} }
// importRule imports the historical data for a single rule. // importRule imports the historical data for a single rule.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error { func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, appender storage.Appender) error {
ts := stimeWithAlignment val, warnings, err := importer.apiClient.QueryRange(ctx,
appender := importer.writer.Appender(ctx) ruleExpr,
v1.Range{
for ts.Before(importer.config.End) { Start: start,
currentBlockEnd := ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) End: end,
if currentBlockEnd.After(importer.config.End) { Step: importer.config.EvalInterval,
currentBlockEnd = importer.config.End },
} )
if err != nil {
return err
}
if warnings != nil {
fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings)
}
val, warnings, err := importer.apiClient.QueryRange(ctx, var matrix model.Matrix
ruleExpr, switch val.Type() {
v1.Range{ case model.ValMatrix:
Start: ts, matrix = val.(model.Matrix)
End: currentBlockEnd, for _, sample := range matrix {
Step: importer.config.EvalInterval,
},
)
if err != nil {
return err
}
if warnings != nil {
fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings)
}
var matrix model.Matrix currentLabels := make(labels.Labels, 0, len(sample.Metric))
switch val.Type() { currentLabels = append(currentLabels, labels.Label{
case model.ValMatrix: Name: labels.MetricName,
matrix = val.(model.Matrix) Value: ruleName,
for _, sample := range matrix { })
currentLabels := make(labels.Labels, 0, len(sample.Metric)) for _, ruleLabel := range ruleLabels {
for k, v := range sample.Metric { currentLabels = append(currentLabels, ruleLabel)
currentLabels = append(currentLabels, labels.Label{ }
Name: string(k), for k, v := range sample.Metric {
Value: string(v), currentLabels = append(currentLabels, labels.Label{
}) Name: string(k),
} Value: string(v),
for _, value := range sample.Values { })
_, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) }
if err != nil { for _, value := range sample.Values {
return err _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value))
} if err != nil {
return err
} }
} }
default:
return errors.New("rule result is wrong type")
} }
default:
ts = currentBlockEnd return errors.New("rule result is wrong type")
} }
_, err := importer.writer.Flush(ctx) return nil
if err != nil {
return err
}
return appender.Commit()
} }

@ -84,10 +84,6 @@ func (w *BlockWriter) Appender(ctx context.Context) storage.Appender {
// Flush implements the Writer interface. This is where actual block writing // Flush implements the Writer interface. This is where actual block writing
// happens. After flush completes, no writes can be done. // happens. After flush completes, no writes can be done.
func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
if w.head.NumSeries() == 0 {
return ulid.ULID{}, errors.New("no series appended, aborting")
}
mint := w.head.MinTime() mint := w.head.MinTime()
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes. // Because of this block intervals are always +1 than the total samples it includes.

Loading…
Cancel
Save