flush samples to disk every 5k samples

Signed-off-by: jessicagreben <jessicagrebens@gmail.com>
pull/7675/head
jessicagreben 2020-11-26 08:30:06 -08:00
parent 5dd3577424
commit ee85c22adb
2 changed files with 96 additions and 114 deletions

View File

@ -44,6 +44,7 @@ import (
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/tsdb"
_ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations.
)
@ -140,7 +141,7 @@ func main() {
createBlocksFromRulesStart := createBlocksFromRulesCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp.").
Required().String()
createBlocksFromRulesEnd := createBlocksFromRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hrs ago. End time should be RFC3339 or Unix timestamp.").String()
createBlocksFromRulesOutputDir := createBlocksFromRulesCmd.Flag("output dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules.").Default("backfilldata/").String()
createBlocksFromRulesOutputDir := createBlocksFromRulesCmd.Flag("output dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules. Don't use an active Prometheus data directory. If command is run many times with same start/end time, it will create duplicate series.").Default("backfilldata/").String()
createBlocksFromRulesURL := createBlocksFromRulesCmd.Flag("url", "Prometheus API url with the data where the rule will be backfilled from.").Default("http://localhost:9090").String()
createBlocksFromRulesEvalInterval := createBlocksFromRulesCmd.Flag("evaluation-interval-default", "How frequently to evaluate rules when backfilling if a value is not set in the rules file.").
Default("60s").Duration()
@ -206,7 +207,7 @@ func main() {
os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime)))
case createBlocksFromRulesCmd.FullCommand():
os.Exit(checkErr(BackfillRule(*createBlocksFromRulesURL, *createBlocksFromRulesStart, *createBlocksFromRulesEnd, *createBlocksFromRulesOutputDir, *createBlocksFromRulesEvalInterval, *createBlocksFromRulesFiles...)))
os.Exit(checkErr(CreateBlocksFromRules(*createBlocksFromRulesURL, *createBlocksFromRulesStart, *createBlocksFromRulesEnd, *createBlocksFromRulesOutputDir, *createBlocksFromRulesEvalInterval, *createBlocksFromRulesFiles...)))
}
}
@ -782,9 +783,9 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) {
json.NewEncoder(os.Stdout).Encode(v)
}
// BackfillRule backfills recording rules from the files provided. The output are blocks of data
// CreateBlocksFromRules backfills recording rules from the files provided. The output are blocks of data
// at the outputDir location.
func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error {
func CreateBlocksFromRules(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error {
ctx := context.Background()
var stime, etime time.Time
var err error
@ -809,19 +810,29 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration,
return nil
}
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
writer, err := tsdb.NewBlockWriter(logger,
outputDir,
tsdb.DefaultBlockDuration,
)
if err != nil {
fmt.Fprintln(os.Stderr, "new writer error", err)
return err
}
cfg := ruleImporterConfig{
Start: stime,
End: etime,
OutputDir: outputDir,
EvalInterval: evalInterval,
URL: url,
}
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
ruleImporter := newRuleImporter(logger, cfg)
if err = ruleImporter.init(); err != nil {
fmt.Fprintln(os.Stderr, "rule importer init error", err)
c, err := api.NewClient(api.Config{
Address: url,
})
if err != nil {
fmt.Fprintln(os.Stderr, "new api client error", err)
return err
}
ruleImporter := newRuleImporter(logger, cfg, v1.NewAPI(c), newMultipleAppender(ctx, maxSamplesInMemory, writer))
errs := ruleImporter.loadGroups(ctx, files)
for _, err := range errs {

View File

@ -22,7 +22,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -31,160 +30,88 @@ import (
"github.com/prometheus/prometheus/tsdb"
)
const maxSamplesInMemory = 5000
type queryRangeAPI interface {
QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error)
}
// ruleImporter is the importer to backfill rules.
type ruleImporter struct {
logger log.Logger
config ruleImporterConfig
apiClient queryRangeAPI
appender *multipleAppender
groups map[string]*rules.Group
groupLoader rules.GroupLoader
apiClient v1.API
writer *tsdb.BlockWriter
ruleManager *rules.Manager
}
// ruleImporterConfig is the config for the rule importer.
type ruleImporterConfig struct {
Start time.Time
End time.Time
OutputDir string
EvalInterval time.Duration
URL string
}
// newRuleImporter creates a new rule importer that can be used to backfill rules.
func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter {
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI, appender *multipleAppender) *ruleImporter {
return &ruleImporter{
logger: logger,
config: config,
groupLoader: rules.FileLoader{},
apiClient: apiClient,
appender: appender,
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
}
}
// init initializes the rule importer which creates a new block writer
// and creates an Prometheus API client.
func (importer *ruleImporter) init() error {
w, err := tsdb.NewBlockWriter(importer.logger,
importer.config.OutputDir,
tsdb.DefaultBlockDuration,
)
if err != nil {
return err
}
importer.writer = w
config := api.Config{
Address: importer.config.URL,
}
c, err := api.NewClient(config)
if err != nil {
return err
}
importer.apiClient = v1.NewAPI(c)
return nil
}
// close cleans up any open resources.
func (importer *ruleImporter) close() error {
return importer.writer.Close()
}
// loadGroups reads groups from a list of rule files.
// loadGroups parses groups from a list of rule files.
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) {
groups := make(map[string]*rules.Group)
for _, filename := range filenames {
rgs, errs := importer.groupLoader.Load(filename)
if errs != nil {
return errs
}
for _, ruleGroup := range rgs.Groups {
itv := importer.config.EvalInterval
if ruleGroup.Interval != 0 {
itv = time.Duration(ruleGroup.Interval)
}
rgRules := make([]rules.Rule, 0, len(ruleGroup.Rules))
for _, r := range ruleGroup.Rules {
expr, err := importer.groupLoader.Parse(r.Expr.Value)
if err != nil {
return []error{errors.Wrap(err, filename)}
}
rgRules = append(rgRules, rules.NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[rules.GroupKey(filename, ruleGroup.Name)] = rules.NewGroup(rules.GroupOptions{
Name: ruleGroup.Name,
File: filename,
Interval: itv,
Rules: rgRules,
Opts: &rules.ManagerOptions{},
})
}
groups, errs := importer.ruleManager.LoadGroups(importer.config.EvalInterval, labels.Labels{}, filenames...)
if errs != nil {
return errs
}
importer.groups = groups
return nil
}
// importAll evaluates all the groups and rules and creates new time series
// and stores them in new blocks.
func (importer *ruleImporter) importAll(ctx context.Context) []error {
var errs = []error{}
var currentBlockEnd time.Time
var appender storage.Appender
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
for name, group := range importer.groups {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name))
stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano())
ts := stimeWithAlignment
// a 2-hr block that contains all the data for each rule
for ts.Before(importer.config.End) {
currentBlockEnd = ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond)
for stimeWithAlignment.Before(importer.config.End) {
currentBlockEnd := stimeWithAlignment.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond)
if currentBlockEnd.After(importer.config.End) {
currentBlockEnd = importer.config.End
}
// should we be creating a new appender for each block?
appender = importer.writer.Appender(ctx)
for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name()))
err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), ts, currentBlockEnd, appender)
if err != nil {
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, currentBlockEnd); err != nil {
errs = append(errs, err)
}
}
ts = currentBlockEnd
_, err := importer.writer.Flush(ctx)
if err != nil {
errs = append(errs, err)
}
err = appender.Commit()
if err != nil {
errs = append(errs, err)
}
stimeWithAlignment = currentBlockEnd
}
}
return errs
}
// importRule imports the historical data for a single rule.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, appender storage.Appender) error {
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time) error {
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: start,
End: end,
Step: importer.config.EvalInterval,
Step: importer.config.EvalInterval, // todo: did we check if the rule has an interval?
},
)
if err != nil {
@ -200,7 +127,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
matrix = val.(model.Matrix)
for _, sample := range matrix {
currentLabels := make(labels.Labels, 0, len(sample.Metric))
currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1)
currentLabels = append(currentLabels, labels.Label{
Name: labels.MetricName,
Value: ruleName,
@ -215,14 +142,58 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
})
}
for _, value := range sample.Values {
_, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value))
if err != nil {
if err := importer.appender.add(ctx, currentLabels, value.Timestamp.Unix(), float64(value.Value)); err != nil {
return err
}
}
}
default:
return errors.New("rule result is wrong type")
return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String()))
}
return nil
}
// multipleAppender keeps track of how many series have been added to the current appender.
// If the max samples have been added, then all series are flushed to disk and commited and a new
// appender is created.
type multipleAppender struct {
maxSamplesInMemory int
currentSampleCount int
writer *tsdb.BlockWriter
appender storage.Appender
}
func newMultipleAppender(ctx context.Context, maxSamplesInMemory int, blockWriter *tsdb.BlockWriter) *multipleAppender {
return &multipleAppender{
maxSamplesInMemory: maxSamplesInMemory,
writer: blockWriter,
appender: blockWriter.Appender(ctx),
}
}
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
if _, err := m.appender.Add(l, t, v); err != nil {
return err
}
m.currentSampleCount++
if m.currentSampleCount > m.maxSamplesInMemory {
return m.flushAndCommit(ctx)
}
return nil
}
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
if _, err := m.writer.Flush(ctx); err != nil {
return err
}
if err := m.appender.Commit(); err != nil {
return err
}
m.appender = m.writer.Appender(ctx)
m.currentSampleCount = 0
return nil
}
func (m *multipleAppender) close() error {
return m.writer.Close()
}