From 6980bcf6716426f8a5057bf0cb88f9a3bc443920 Mon Sep 17 00:00:00 2001 From: jessicagreben Date: Sat, 31 Oct 2020 06:40:24 -0700 Subject: [PATCH] unexport backfiller Signed-off-by: jessicagreben --- cmd/promtool/main.go | 10 ++++---- cmd/promtool/rules.go | 55 +++++++++++++++++++++++-------------------- rules/manager.go | 37 ++++++----------------------- 3 files changed, 41 insertions(+), 61 deletions(-) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 74804fdf6..d594ba1f2 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -792,7 +792,7 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, fmt.Fprintln(os.Stderr, err) return err } - cfg := RuleImporterConfig{ + cfg := ruleImporterConfig{ Start: stime, End: etime, OutputDir: outputDir, @@ -800,13 +800,13 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, URL: url, } logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - ruleImporter := NewRuleImporter(logger, cfg) - if err = ruleImporter.Init(); err != nil { + ruleImporter := newRuleImporter(logger, cfg) + if err = ruleImporter.init(); err != nil { fmt.Fprintln(os.Stderr, "rule importer init error", err) return err } - errs := ruleImporter.LoadGroups(ctx, files) + errs := ruleImporter.loadGroups(ctx, files) for _, err := range errs { if err != nil { fmt.Fprintln(os.Stderr, "rule importer parse error", err) @@ -814,7 +814,7 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, } } - errs = ruleImporter.ImportAll(ctx) + errs = ruleImporter.importAll(ctx) for _, err := range errs { if err != nil { fmt.Fprintln(os.Stderr, "rule importer error", err) diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index cb0586804..85425cc0c 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -31,21 +31,21 @@ import ( const blockSize = 2 // in hours -// RuleImporter is the importer to backfill rules. -type RuleImporter struct { +// ruleImporter is the importer to backfill rules. +type ruleImporter struct { logger log.Logger - config RuleImporterConfig + config ruleImporterConfig groups map[string]*rules.Group groupLoader rules.GroupLoader apiClient v1.API - writer *blocks.MultiWriter + writer *tsdb.BlockWriter } -// RuleImporterConfig is the config for the rule importer. -type RuleImporterConfig struct { +// ruleImporterConfig is the config for the rule importer. +type ruleImporterConfig struct { Start time.Time End time.Time OutputDir string @@ -53,21 +53,25 @@ type RuleImporterConfig struct { URL string } -// NewRuleImporter creates a new rule importer that can be used to backfill rules. -func NewRuleImporter(logger log.Logger, config RuleImporterConfig) *RuleImporter { - return &RuleImporter{ +// newRuleImporter creates a new rule importer that can be used to backfill rules. +func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter { + return &ruleImporter{ config: config, groupLoader: rules.FileLoader{}, } } -// Init initializes the rule importer which creates a new block writer +// init initializes the rule importer which creates a new block writer // and creates an Prometheus API client. -func (importer *RuleImporter) Init() error { - importer.writer = tsdb.NewBlockWriter(importer.logger, +func (importer *ruleImporter) init() error { + w, err := tsdb.NewBlockWriter(importer.logger, importer.config.OutputDir, - (blockSize * time.Hour).Milliseconds() + (blockSize * time.Hour).Milliseconds(), ) + if err != nil { + return err + } + importer.writer = w config := api.Config{ Address: importer.config.URL, @@ -80,13 +84,13 @@ func (importer *RuleImporter) Init() error { return nil } -// Close cleans up any open resources. -func (importer *RuleImporter) Close() error { +// close cleans up any open resources. +func (importer *ruleImporter) close() error { return importer.writer.Close() } -// LoadGroups reads groups from a list of rule files. -func (importer *RuleImporter) LoadGroups(ctx context.Context, filenames []string) (errs []error) { +// loadGroups reads groups from a list of rule files. +func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { groups := make(map[string]*rules.Group) for _, filename := range filenames { @@ -127,31 +131,31 @@ func (importer *RuleImporter) LoadGroups(ctx context.Context, filenames []string return nil } -// ImportAll evaluates all the groups and rules and creates new time series +// importAll evaluates all the groups and rules and creates new time series // and stores them in new blocks. -func (importer *RuleImporter) ImportAll(ctx context.Context) []error { +func (importer *ruleImporter) importAll(ctx context.Context) []error { var errs = []error{} for _, group := range importer.groups { stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) for _, r := range group.Rules() { - err := importer.ImportRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval()) + err := importer.importRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval()) if err != nil { errs = append(errs, err) } } } - _, err := importer.writer.Flush() + _, err := importer.writer.Flush(ctx) if err != nil { errs = append(errs, err) } return errs } -// ImportRule imports the historical data for a single rule. -func (importer *RuleImporter) ImportRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error { +// importRule imports the historical data for a single rule. +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error { ts := stimeWithAlignment - appender := importer.writer.Appender() + appender := importer.writer.Appender(ctx) for ts.Before(importer.config.End) { currentBlockEnd := ts.Add(blockSize * time.Hour) @@ -189,7 +193,6 @@ func (importer *RuleImporter) ImportRule(ctx context.Context, ruleExpr string, s for _, value := range sample.Values { _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) if err != nil { - // todo: handle other errors, i.e. ErrOutOfOrderSample and ErrDuplicateSampleForTimestamp return err } } @@ -200,7 +203,7 @@ func (importer *RuleImporter) ImportRule(ctx context.Context, ruleExpr string, s ts = currentBlockEnd } - _, err := importer.writer.Flush() + _, err := importer.writer.Flush(ctx) if err != nil { return err } diff --git a/rules/manager.go b/rules/manager.go index 63a7ae72b..40cfc0659 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -280,13 +280,9 @@ func NewGroup(o GroupOptions) *Group { metrics = NewGroupMetrics(o.Opts.Registerer) } -<<<<<<< HEAD key := GroupKey(o.File, o.Name) -======= - key := groupKey(o.File, o.Name) metrics.iterationsMissed.WithLabelValues(key) metrics.iterationsScheduled.WithLabelValues(key) ->>>>>>> master metrics.evalTotal.WithLabelValues(key) metrics.evalFailures.WithLabelValues(key) metrics.groupLastEvalTime.WithLabelValues(key) @@ -342,7 +338,7 @@ func (g *Group) run(ctx context.Context) { }) iter := func() { - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Inc() + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() start := time.Now() g.Eval(ctx, evalTimestamp) @@ -394,8 +390,8 @@ func (g *Group) run(ctx context.Context) { case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 if missed > 0 { - g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) iter() @@ -416,8 +412,8 @@ func (g *Group) run(ctx context.Context) { case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 if missed > 0 { - g.metrics.iterationsMissed.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) - g.metrics.iterationsScheduled.WithLabelValues(groupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.iterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) iter() @@ -478,15 +474,9 @@ func (g *Group) GetEvaluationTime() time.Duration { return g.evaluationTime } -<<<<<<< HEAD // setEvaluationDuration sets the time in seconds the last evaluation took. func (g *Group) setEvaluationDuration(dur time.Duration) { g.metrics.groupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds()) -======= -// setEvaluationTime sets the time in seconds the last evaluation took. -func (g *Group) setEvaluationTime(dur time.Duration) { - g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(dur.Seconds()) ->>>>>>> master g.mtx.Lock() defer g.mtx.Unlock() @@ -500,15 +490,9 @@ func (g *Group) GetLastEvaluation() time.Time { return g.lastEvaluation } -<<<<<<< HEAD // setEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule group was last evaluated. func (g *Group) setEvaluationTimestamp(ts time.Time) { g.metrics.groupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) -======= -// setLastEvaluation updates lastEvaluation to the timestamp of when the rule group was last evaluated. -func (g *Group) setLastEvaluation(ts time.Time) { - g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) ->>>>>>> master g.mtx.Lock() defer g.mtx.Unlock() @@ -519,8 +503,7 @@ func (g *Group) setLastEvaluation(ts time.Time) { func (g *Group) EvalTimestamp(startTime int64) time.Time { var ( offset = int64(g.hash() % uint64(g.interval)) - start = startTime - adjNow = start - offset + adjNow = startTime - offset base = adjNow - (adjNow % int64(g.interval)) ) @@ -615,13 +598,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := err.(promql.ErrQueryCanceled); !ok { level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) } -<<<<<<< HEAD g.metrics.evalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() -======= - sp.SetTag("error", true) - sp.LogKV("error", err) - g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc() ->>>>>>> master return } samplesTotal += float64(len(vector)) @@ -683,7 +660,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { }(i, rule) } if g.metrics != nil { - g.metrics.groupSamples.WithLabelValues(groupKey(g.File(), g.Name())).Set(samplesTotal) + g.metrics.groupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) } g.cleanupStaleSeries(ctx, ts) }