From 53480c168dcad749dabffd9fd192ce34dc329ed1 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 9 Dec 2020 22:58:23 +0100 Subject: [PATCH] Backfill: print created blocks only, add human-readable option Signed-off-by: Julien Pivotto --- cmd/promtool/backfill.go | 38 +++++++++++++++++++++++------------ cmd/promtool/backfill_test.go | 2 +- cmd/promtool/main.go | 3 ++- cmd/promtool/tsdb.go | 13 +++++++----- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 6848bb37a..b3c33208d 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -65,7 +65,7 @@ func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) { return maxt, mint, nil } -func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string) (returnErr error) { +func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string, humanReadable bool) (returnErr error) { blockDuration := tsdb.DefaultBlockDuration mint = blockDuration * (mint / blockDuration) @@ -77,6 +77,8 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err() }() + var wroteHeader bool + for t := mint; t <= maxt; t = t + blockDuration { err := func() error { w, err := tsdb.NewBlockWriter(log.NewNopLogger(), outputDir, blockDuration) @@ -133,36 +135,46 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp app = w.Appender(ctx) samplesCount = 0 } + if err := app.Commit(); err != nil { return errors.Wrap(err, "commit") } - if _, err := w.Flush(ctx); err != nil && err != tsdb.ErrNoSeriesAppended { + + block, err := w.Flush(ctx) + switch err { + case nil: + blocks, err := db.Blocks() + if err != nil { + return errors.Wrap(err, "get blocks") + } + for _, b := range blocks { + if b.Meta().ULID == block { + printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable) + wroteHeader = true + break + } + } + case tsdb.ErrNoSeriesAppended: + default: return errors.Wrap(err, "flush") } + return nil }() if err != nil { return errors.Wrap(err, "process blocks") } - - blocks, err := db.Blocks() - if err != nil { - return errors.Wrap(err, "get blocks") - } - if len(blocks) <= 0 { - continue - } - printBlocks(blocks[len(blocks)-1:], true) } return nil + } -func backfill(maxSamplesInAppender int, input []byte, outputDir string) (err error) { +func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable bool) (err error) { p := textparse.NewOpenMetricsParser(input) maxt, mint, err := getMinAndMaxTimestamps(p) if err != nil { return errors.Wrap(err, "getting min and max timestamp") } - return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir), "block creation") + return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir, humanReadable), "block creation") } diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 4fe983ac1..87f18d64b 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -423,7 +423,7 @@ after_eof 1 2 require.NoError(t, os.RemoveAll(outputDir)) }() - err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir) + err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false) if !test.IsOk { require.Error(t, err, test.Description) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 7de799664..4b1229c45 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -134,6 +134,7 @@ func main() { dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.") + importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool() openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.") // TODO(aSquare14): add flag to set default block duration importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String() @@ -196,7 +197,7 @@ func main() { os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime))) //TODO(aSquare14): Work on adding support for custom block size. case openMetricsImportCmd.FullCommand(): - os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath))) + os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable))) } } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index b2ca5da92..149573333 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -354,15 +354,18 @@ func listBlocks(path string, humanReadable bool) error { if err != nil { return err } - printBlocks(blocks, humanReadable) + printBlocks(blocks, true, humanReadable) return nil } -func printBlocks(blocks []tsdb.BlockReader, humanReadable bool) { +func printBlocks(blocks []tsdb.BlockReader, writeHeader, humanReadable bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() - fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE") + if writeHeader { + fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE") + } + for _, b := range blocks { meta := b.Meta() @@ -615,11 +618,11 @@ func checkErr(err error) int { return 0 } -func backfillOpenMetrics(path string, outputDir string) (err error) { +func backfillOpenMetrics(path string, outputDir string, humanReadable bool) (err error) { inputFile, err := fileutil.OpenMmapFile(path) if err != nil { return err } defer inputFile.Close() - return backfill(5000, inputFile.Bytes(), outputDir) + return backfill(5000, inputFile.Bytes(), outputDir, humanReadable) }