add alignment, mv rule importer to promtool dir, add queryRange

Signed-off-by: jessicagreben <jessicagrebens@gmail.com>
pull/7675/head
jessicagreben 2020-09-13 08:07:59 -07:00
parent 7504b5ce7c
commit dfa510086b
4 changed files with 252 additions and 286 deletions

View File

@ -41,7 +41,6 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/importers"
"github.com/prometheus/prometheus/pkg/rulefmt"
)
@ -132,16 +131,16 @@ func main() {
backfillCmd := app.Command("backfill", "Backfill Prometheus data.")
backfillRuleCmd := backfillCmd.Command("rules", "Backfill Prometheus data for new rules.")
backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url.").Required().String()
backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling.").
Default("-3h").Duration()
backfillRuleStart := backfillRuleCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp.").
Required().Duration()
backfillRuleEnd := backfillRuleCmd.Flag("end", "If an end time is provided, the new rule backfilling will end at this time. The default will backfill to the 3 hrs ago. End time should be RFC3339 or Unix timestamp.").
Default("").Duration()
Required().String()
backfillRuleEnd := backfillRuleCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hrs ago. End time should be RFC3339 or Unix timestamp.").
Default("-3h").String()
backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url with the data where the rule will be backfilled from.").Default("localhost:9090").String()
backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling.").
Default("15s").Duration()
backfillRuleFiles := backfillRuleCmd.Arg(
"rule-files",
"The file containing the new rule that needs to be backfilled.",
"A list of one or more files containing recording rules to be backfilled. All recording rules listed in the files will be backfilled. Alerting rules are not evaluated.",
).Required().ExistingFiles()
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
@ -767,24 +766,29 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) {
json.NewEncoder(os.Stdout).Encode(v)
}
// BackfillRule backfills rules from the files provided
func BackfillRule(url string, start, end, evalInterval time.Duration, files ...string) int {
// BackfillRule backfills rules from the files provided.
func BackfillRule(url, start, end string, evalInterval time.Duration, files ...string) int {
ctx := context.Background()
cfg := importers.RuleConfig{
Start: start.String(),
End: end.String(),
stime, etime, err := parseStartTimeAndEndTime(start, end)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
cfg := RuleImporterConfig{
Start: stime,
End: etime,
EvalInterval: evalInterval,
URL: url,
}
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
ruleImporter := importers.NewRuleImporter(logger, cfg)
err := ruleImporter.Init()
ruleImporter := NewRuleImporter(logger, cfg)
err = ruleImporter.Init()
if err != nil {
fmt.Fprintln(os.Stderr, "rule importer init error", err)
return 1
}
errs := ruleImporter.Parse(ctx, files)
errs := ruleImporter.LoadGroups(ctx, files)
for _, err := range errs {
if err != nil {
fmt.Fprintln(os.Stderr, "rule importer parse error", err)

216
cmd/promtool/rules.go Normal file
View File

@ -0,0 +1,216 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/tsdb/importer/blocks"
)
const blockSize = 2 // in hours
// RuleImporter is the importer to backfill rules.
type RuleImporter struct {
logger log.Logger
config RuleImporterConfig
groups map[string]*rules.Group
groupLoader rules.GroupLoader
apiClient v1.API
writer *blocks.MultiWriter
}
// RuleImporterConfig is the config for the rule importer.
type RuleImporterConfig struct {
Start time.Time
End time.Time
EvalInterval time.Duration
URL string
}
// NewRuleImporter creates a new rule importer that can be used to backfill rules.
func NewRuleImporter(logger log.Logger, config RuleImporterConfig) *RuleImporter {
return &RuleImporter{
config: config,
groupLoader: rules.FileLoader{},
}
}
// Init initializes the rule importer which creates a new block writer
// and creates an Prometheus API client.
func (importer *RuleImporter) Init() error {
// todo: clean up dir
newBlockDir, err := ioutil.TempDir("", "rule_blocks")
if err != nil {
return err
}
importer.writer = blocks.NewMultiWriter(importer.logger, newBlockDir, importer.config.EvalInterval.Nanoseconds())
config := api.Config{
Address: importer.config.URL,
}
c, err := api.NewClient(config)
if err != nil {
return err
}
importer.apiClient = v1.NewAPI(c)
return nil
}
// Close cleans up any open resources.
func (importer *RuleImporter) Close() error {
// todo: clean up any dirs that were created
return importer.writer.Close()
}
// LoadGroups reads groups from a list of rule files.
func (importer *RuleImporter) LoadGroups(ctx context.Context, filenames []string) (errs []error) {
groups := make(map[string]*rules.Group)
for _, filename := range filenames {
rgs, errs := importer.groupLoader.Load(filename)
if errs != nil {
return errs
}
for _, ruleGroup := range rgs.Groups {
itv := importer.config.EvalInterval
if ruleGroup.Interval != 0 {
itv = time.Duration(ruleGroup.Interval)
}
rgRules := make([]rules.Rule, 0, len(ruleGroup.Rules))
for _, r := range ruleGroup.Rules {
expr, err := importer.groupLoader.Parse(r.Expr.Value)
if err != nil {
return []error{errors.Wrap(err, filename)}
}
rgRules = append(rgRules, rules.NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[rules.GroupKey(filename, ruleGroup.Name)] = rules.NewGroup(rules.GroupOptions{
Name: ruleGroup.Name,
File: filename,
Interval: itv,
Rules: rgRules,
})
}
}
importer.groups = groups
return nil
}
// ImportAll evaluates all the groups and rules and creates new time series
// and stores them in new blocks.
func (importer *RuleImporter) ImportAll(ctx context.Context) []error {
var errs = []error{}
for _, group := range importer.groups {
stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano())
for _, r := range group.Rules() {
err := importer.ImportRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval())
if err != nil {
errs = append(errs, err)
}
}
}
_, err := importer.writer.Flush()
if err != nil {
errs = append(errs, err)
}
return errs
}
// ImportRule imports the historical data for a single rule.
func (importer *RuleImporter) ImportRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error {
ts := stimeWithAlignment
appender := importer.writer.Appender()
for ts.Before(importer.config.End) {
currentBlockEnd := ts.Add(blockSize * time.Hour)
if currentBlockEnd.After(importer.config.End) {
currentBlockEnd = importer.config.End
}
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: ts,
End: currentBlockEnd,
Step: importer.config.EvalInterval,
},
)
if err != nil {
return err
}
if warnings != nil {
fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings)
}
var matrix model.Matrix
switch val.Type() {
case model.ValMatrix:
matrix = val.(model.Matrix)
for _, sample := range matrix {
currentLabels := make(labels.Labels, 0, len(sample.Metric))
for k, v := range sample.Metric {
currentLabels = append(currentLabels, labels.Label{
Name: string(k),
Value: string(v),
})
}
for _, value := range sample.Values {
_, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value))
if err != nil {
// todo: handle other errors, i.e. ErrOutOfOrderSample and ErrDuplicateSampleForTimestamp
return err
}
}
}
default:
return errors.New("rule result is wrong type")
}
ts = currentBlockEnd
}
_, err := importer.writer.Flush()
if err != nil {
return err
}
return appender.Commit()
}

View File

@ -1,256 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importers
import (
"context"
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"sort"
"strconv"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
plabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/tsdb/importer/blocks"
)
// RuleImporter is the importer for rules
type RuleImporter struct {
logger log.Logger
config RuleConfig
groups map[string]*rules.Group
apiClient v1.API
writer *blocks.MultiWriter
}
// RuleConfig is the config for the rule importer
type RuleConfig struct {
Start string
End string
EvalInterval time.Duration
URL string
}
// NewRuleImporter creates a new rule importer
func NewRuleImporter(logger log.Logger, config RuleConfig) *RuleImporter {
return &RuleImporter{
config: config,
}
}
// Init initializes the rule importer which creates a new block writer
// and creates an Prometheus API client
func (importer *RuleImporter) Init() error {
// create new block writer
newBlockDir, err := ioutil.TempDir("", "rule_blocks")
if err != nil {
return err
}
importer.writer = blocks.NewMultiWriter(importer.logger, newBlockDir, importer.config.EvalInterval.Nanoseconds())
// create api client
config := api.Config{
Address: importer.config.URL,
}
c, err := api.NewClient(config)
if err != nil {
return err
}
importer.apiClient = v1.NewAPI(c)
return nil
}
// Close cleans up any open resources
func (importer *RuleImporter) Close() error {
return importer.writer.Close()
}
// Parse parses the groups and rules from a list of rules files
func (importer *RuleImporter) Parse(ctx context.Context, files []string) (errs []error) {
groups := make(map[string]*rules.Group)
for _, file := range files {
ruleGroups, errs := rulefmt.ParseFile(file)
if errs != nil {
return errs
}
for _, ruleGroup := range ruleGroups.Groups {
itv := importer.config.EvalInterval
if ruleGroup.Interval != 0 {
itv = time.Duration(ruleGroup.Interval)
}
rulez := make([]rules.Rule, 0, len(ruleGroup.Rules))
for _, r := range ruleGroup.Rules {
expr, err := parser.ParseExpr(r.Expr.Value)
if err != nil {
return []error{errors.Wrap(err, file)}
}
rulez = append(rulez, rules.NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[file+";"+ruleGroup.Name] = rules.NewGroup(rules.GroupOptions{
Name: ruleGroup.Name,
File: file,
Interval: itv,
Rules: rulez,
})
}
}
importer.groups = groups
return errs
}
// ImportAll evaluates all the groups and rules and creates new time series
// and stores in new blocks
func (importer *RuleImporter) ImportAll(ctx context.Context) []error {
var errs = []error{}
for _, group := range importer.groups {
for _, rule := range group.Rules() {
err := importer.ImportRule(ctx, rule)
if err != nil {
errs = append(errs, err)
}
}
}
err := importer.CreateBlocks()
if err != nil {
errs = append(errs, err)
}
return errs
}
func (importer *RuleImporter) queryFn(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
val, warnings, err := importer.apiClient.Query(ctx, q, t)
if err != nil {
return promql.Vector{}, err
}
if warnings != nil {
fmt.Fprint(os.Stderr, "warning api.Query:", warnings)
}
switch val.Type() {
case model.ValVector:
valVector := val.(model.Vector)
return modelToPromqlVector(valVector), nil
case model.ValScalar:
valScalar := val.(*model.Scalar)
return promql.Vector{promql.Sample{
Metric: labels.Labels{},
Point: promql.Point{T: int64(valScalar.Timestamp), V: float64(valScalar.Value)},
}}, nil
default:
return nil, errors.New("rule result is wrong type")
}
}
func modelToPromqlVector(modelValue model.Vector) promql.Vector {
result := make(promql.Vector, 0, len(modelValue))
for _, value := range modelValue {
labels := make(labels.Labels, 0, len(value.Metric))
for k, v := range value.Metric {
labels = append(labels, plabels.Label{
Name: string(k),
Value: string(v),
})
}
sort.Sort(labels)
result = append(result, promql.Sample{
Metric: labels,
Point: promql.Point{T: int64(value.Timestamp), V: float64(value.Value)},
})
}
return result
}
// ImportRule imports the historical data for a single rule
func (importer *RuleImporter) ImportRule(ctx context.Context, rule rules.Rule) error {
ts, err := parseTime(importer.config.Start)
if err != nil {
return err
}
end, err := parseTime(importer.config.End)
if err != nil {
return err
}
url, err := url.Parse(importer.config.URL)
if err != nil {
return err
}
appender := importer.writer.Appender()
for ts.Before(end) {
vector, err := rule.Eval(ctx, ts, importer.queryFn, url)
if err != nil {
return err
}
for _, sample := range vector {
// we don't AddFast here because we need to maintain the
// ref for each series bcs rule.Eval could return different labels,
// so that means you would need to map the ref to metric, but that is what Add does
// anyways so just use that
_, err := appender.Add(plabels.Labels{plabels.Label{Name: sample.String()}}, sample.T, sample.V)
if err != nil {
return err
}
}
ts.Add(importer.config.EvalInterval)
// todo: 2 hr blocks?
}
return appender.Commit()
}
func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s)
}
// CreateBlocks creates blocks for all the rule data
func (importer *RuleImporter) CreateBlocks() error {
_, err := importer.writer.Flush()
return err
}

View File

@ -200,6 +200,8 @@ type Rule interface {
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
// String returns a human-readable string representation of the rule.
String() string
// Query returns the rule query expression.
Query() parser.Expr
// SetLastErr sets the current error experienced by the rule.
SetLastError(error)
// LastErr returns the last error experienced by the rule.
@ -262,7 +264,7 @@ func NewGroup(o GroupOptions) *Group {
metrics = NewGroupMetrics(o.Opts.Registerer)
}
key := groupKey(o.File, o.Name)
key := GroupKey(o.File, o.Name)
metrics.evalTotal.WithLabelValues(key)
metrics.evalFailures.WithLabelValues(key)
metrics.groupLastEvalTime.WithLabelValues(key)
@ -302,7 +304,7 @@ func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.evalTimestamp().Add(g.interval)
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
@ -455,7 +457,7 @@ func (g *Group) GetEvaluationDuration() time.Duration {
// setEvaluationDuration sets the time in seconds the last evaluation took.
func (g *Group) setEvaluationDuration(dur time.Duration) {
g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(dur.Seconds())
g.metrics.groupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds())
g.mtx.Lock()
defer g.mtx.Unlock()
@ -471,19 +473,19 @@ func (g *Group) GetEvaluationTimestamp() time.Time {
// setEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
func (g *Group) setEvaluationTimestamp(ts time.Time) {
g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9)
g.metrics.groupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9)
g.mtx.Lock()
defer g.mtx.Unlock()
g.evaluationTimestamp = ts
}
// evalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) evalTimestamp() time.Time {
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) EvalTimestamp(startTime int64) time.Time {
var (
offset = int64(g.hash() % uint64(g.interval))
now = time.Now().UnixNano()
adjNow = now - offset
start = startTime
adjNow = start - offset
base = adjNow - (adjNow % int64(g.interval))
)
@ -567,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
rule.SetEvaluationTimestamp(t)
}(time.Now())
g.metrics.evalTotal.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
g.metrics.evalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
@ -576,7 +578,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
g.metrics.evalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
return
}
@ -929,7 +931,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := groupKey(newg.file, newg.name)
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)
@ -1042,7 +1044,7 @@ func (m *Manager) LoadGroups(
))
}
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
@ -1057,8 +1059,8 @@ func (m *Manager) LoadGroups(
return groups, nil
}
// Group names need not be unique across filenames.
func groupKey(file, name string) string {
// GroupKey group names need not be unique across filenames.
func GroupKey(file, name string) string {
return file + ";" + name
}