Browse Source

chore(tsdb): add a sandboxDir to DBReadOnly, the directory can be used for transient file writes.

use it in loadDataAsQueryable to make sure the RO Head doesn't truncate or cut new chunks in data/chunks_head/.

add a -sandbox-dir-root flag to "promtool tsdb dump/dump-openmetrics" to control the root of that sandbox dirrectory.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/13218/head
machine424 12 months ago
parent
commit
c5a1cc9148
No known key found for this signature in database
GPG Key ID: A4B001A4FDEE017D
  1. 2
      cmd/promtool/backfill.go
  2. 6
      cmd/promtool/main.go
  3. 8
      cmd/promtool/tsdb.go
  4. 1
      cmd/promtool/tsdb_test.go
  5. 2
      docs/command-line/promtool.md
  6. 27
      tsdb/chunks/head_chunks.go
  7. 2
      tsdb/compact_test.go
  8. 45
      tsdb/db.go
  9. 89
      tsdb/db_test.go

2
cmd/promtool/backfill.go

@ -88,7 +88,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration)
db, err := tsdb.OpenDBReadOnly(outputDir, nil)
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
if err != nil {
return err
}

6
cmd/promtool/main.go

@ -235,12 +235,14 @@ func main() {
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpSandboxDirRoot := tsdbDumpCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.")
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpOpenMetricsSandboxDirRoot := tsdbDumpOpenMetricsCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
@ -396,9 +398,9 @@ func main() {
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
case tsdbDumpCmd.FullCommand():
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpSandboxDirRoot, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
case tsdbDumpOpenMetricsCmd.FullCommand():
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
// TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand():
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))

8
cmd/promtool/tsdb.go

@ -338,7 +338,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
}
func listBlocks(path string, humanReadable bool) error {
db, err := tsdb.OpenDBReadOnly(path, nil)
db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil {
return err
}
@ -393,7 +393,7 @@ func getFormatedBytes(bytes int64, humanReadable bool) string {
}
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
db, err := tsdb.OpenDBReadOnly(path, nil)
db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil {
return nil, nil, err
}
@ -708,8 +708,8 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
type SeriesSetFormatter func(series storage.SeriesSet) error
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
db, err := tsdb.OpenDBReadOnly(path, nil)
func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
if err != nil {
return err
}

1
cmd/promtool/tsdb_test.go

@ -64,6 +64,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
err := dumpSamples(
context.Background(),
path,
t.TempDir(),
mint,
maxt,
match,

2
docs/command-line/promtool.md

@ -566,6 +566,7 @@ Dump samples from a TSDB.
| Flag | Description | Default |
| --- | --- | --- |
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
@ -592,6 +593,7 @@ Dump samples from a TSDB.
| Flag | Description | Default |
| --- | --- | --- |
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |

27
tsdb/chunks/head_chunks.go

@ -381,6 +381,33 @@ func listChunkFiles(dir string) (map[int]string, error) {
return res, nil
}
// HardLinkChunkFiles creates hardlinks for chunk files from src to dst.
// It does nothing if src doesn't exist and ensures dst is created if not.
func HardLinkChunkFiles(src, dst string) error {
_, err := os.Stat(src)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("check source chunks dir: %w", err)
}
if err := os.MkdirAll(dst, 0o777); err != nil {
return fmt.Errorf("set up destination chunks dir: %w", err)
}
files, err := listChunkFiles(src)
if err != nil {
return fmt.Errorf("list chunks: %w", err)
}
for _, filePath := range files {
_, fileName := filepath.Split(filePath)
err := os.Link(filepath.Join(src, fileName), filepath.Join(dst, fileName))
if err != nil {
return fmt.Errorf("hardlink a chunk: %w", err)
}
}
return nil
}
// repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these files, we could end
// up with an empty file at the end during an abrupt shutdown.

2
tsdb/compact_test.go

@ -1297,7 +1297,7 @@ func TestCancelCompactions(t *testing.T) {
// This checks that the `context.Canceled` error is properly checked at all levels:
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
// - callers should check with errors.Is() instead of ==.
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, log.NewNopLogger())
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", log.NewNopLogger())
require.NoError(t, err)
blocks, err := readOnlyDB.Blocks()
require.NoError(t, err)

45
tsdb/db.go

@ -383,26 +383,36 @@ var ErrClosed = errors.New("db already closed")
// Current implementation doesn't support concurrency so
// all API calls should happen in the same go routine.
type DBReadOnly struct {
logger log.Logger
dir string
closers []io.Closer
closed chan struct{}
logger log.Logger
dir string
sandboxDir string
closers []io.Closer
closed chan struct{}
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
func OpenDBReadOnly(dir, sandboxDirRoot string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, fmt.Errorf("opening the db dir: %w", err)
}
if sandboxDirRoot == "" {
sandboxDirRoot = dir
}
sandboxDir, err := os.MkdirTemp(sandboxDirRoot, "tmp_dbro_sandbox")
if err != nil {
return nil, fmt.Errorf("setting up sandbox dir: %w", err)
}
if l == nil {
l = log.NewNopLogger()
}
return &DBReadOnly{
logger: l,
dir: dir,
closed: make(chan struct{}),
logger: l,
dir: dir,
sandboxDir: sandboxDir,
closed: make(chan struct{}),
}, nil
}
@ -491,7 +501,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
// Hard link the chunk files to a dir in db.sandboxDir in case the Head needs to truncate some of them
// or cut new ones while replaying the WAL.
// See https://github.com/prometheus/prometheus/issues/11618.
err = chunks.HardLinkChunkFiles(mmappedChunksDir(db.dir), mmappedChunksDir(db.sandboxDir))
if err != nil {
return nil, err
}
opts.ChunkDirRoot = db.sandboxDir
head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats())
if err != nil {
return nil, err
@ -519,7 +536,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
}
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
opts.ChunkDirRoot = db.sandboxDir
head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
if err != nil {
return nil, err
@ -690,8 +707,14 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
return block, nil
}
// Close all block readers.
// Close all block readers and delete the sandbox dir.
func (db *DBReadOnly) Close() error {
defer func() {
// Delete the temporary sandbox directory that was created when opening the DB.
if err := os.RemoveAll(db.sandboxDir); err != nil {
level.Error(db.logger).Log("msg", "delete sandbox dir", "err", err)
}
}()
select {
case <-db.closed:
return ErrClosed

89
tsdb/db_test.go

@ -25,6 +25,7 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"sync"
@ -2494,7 +2495,7 @@ func TestDBReadOnly(t *testing.T) {
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }()
@ -2548,10 +2549,14 @@ func TestDBReadOnly(t *testing.T) {
// TestDBReadOnlyClosing ensures that after closing the db
// all api methods return an ErrClosed.
func TestDBReadOnlyClosing(t *testing.T) {
dbDir := t.TempDir()
db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
sandboxDir := t.TempDir()
db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
require.NoError(t, err)
// The sandboxDir was there.
require.DirExists(t, db.sandboxDir)
require.NoError(t, db.Close())
// The sandboxDir was deleted when closing.
require.NoDirExists(t, db.sandboxDir)
require.Equal(t, db.Close(), ErrClosed)
_, err = db.Blocks()
require.Equal(t, err, ErrClosed)
@ -2587,7 +2592,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
}
// Flush WAL.
db, err := OpenDBReadOnly(dbDir, logger)
db, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err)
flush := t.TempDir()
@ -2595,7 +2600,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, logger)
db, err = OpenDBReadOnly(flush, "", logger)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks()
@ -2624,6 +2629,80 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.Equal(t, 1000.0, sum)
}
func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
countChunks := func(dir string) int {
files, err := os.ReadDir(mmappedChunksDir(dir))
require.NoError(t, err)
return len(files)
}
dirHash := func(dir string) (hash []byte) {
// Windows requires the DB to be closed: "xxx\lock: The process cannot access the file because it is being used by another process."
// But closing the DB alters the directory in this case (it'll cut a new chunk).
if runtime.GOOS != "windows" {
hash = testutil.DirHash(t, dir)
}
return
}
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
dBDirHash := dirHash(dir)
// Bootsrap a RO db from the same dir and set up a querier.
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
require.NoError(t, err)
require.Equal(t, chunksCount, countChunks(dir))
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
require.NoError(t, err)
require.NoError(t, q.Close())
require.NoError(t, dbReadOnly.Close())
// The RO Head doesn't alter RW db chunks_head/.
require.Equal(t, chunksCount, countChunks(dir))
require.Equal(t, dirHash(dir), dBDirHash)
}
t.Run("doesn't cut chunks while replaying WAL", func(t *testing.T) {
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
// Append until the first mmaped head chunk.
for i := 0; i < 121; i++ {
app := db.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
spinUpQuerierAndCheck(db.dir, t.TempDir(), 0)
// The RW Head should have no problem cutting its own chunk,
// this also proves that a chunk needed to be cut.
require.NotPanics(t, func() { db.ForceHeadMMap() })
require.Equal(t, 1, countChunks(db.dir))
})
t.Run("doesn't truncate corrupted chunks", func(t *testing.T) {
db := openTestDB(t, nil, nil)
require.NoError(t, db.Close())
// Simulate a corrupted chunk: without a header.
_, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
require.NoError(t, err)
spinUpQuerierAndCheck(db.dir, t.TempDir(), 1)
// The RW Head should have no problem truncating its corrupted file:
// this proves that the chunk needed to be truncated.
db, err = Open(db.dir, nil, nil, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
require.NoError(t, err)
require.Equal(t, 0, countChunks(db.dir))
})
}
func TestDBCannotSeePartialCommits(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")

Loading…
Cancel
Save