From f31a0d64577655e6267e24693fefe9279d4b0530 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Jun 2018 13:35:36 +0100 Subject: [PATCH] add Test for Tombstone Cleaning after a failure Signed-off-by: Krasi Georgiev --- db_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/db_test.go b/db_test.go index 543ff9c76..f5364836d 100644 --- a/db_test.go +++ b/db_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "io/ioutil" "math" "math/rand" @@ -21,6 +22,7 @@ import ( "path/filepath" "sort" "testing" + "time" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -781,6 +783,71 @@ func TestTombstoneClean(t *testing.T) { } } +func TestTombstoneCleanFail(t *testing.T) { + + db, close := openTestDB(t, nil) + defer close() + + var expectedBlockDirs []string + + // Create 2 empty blocks with some fake tombstones to have something to work with. + for i := 0; i < 2; i++ { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + meta := &BlockMeta{ + Version: 2, + ULID: uid, + } + blockDir := filepath.Join(db.Dir(), uid.String()) + block := createEmptyBlock(t, blockDir, meta) + tomb := memTombstones{} + tomb[0] = Intervals{{0, 1}} + block.tombstones = tomb + + db.blocks = append(db.blocks, block) + expectedBlockDirs = append(expectedBlockDirs, blockDir) + } + db.compactor = &mockCompactorFailing{t: t} + + // This should fail as we are using the failing compactor! + testutil.NotOk(t, db.CleanTombstones()) + + actualBlockDirs, err := blockDirs(db.dir) + testutil.Ok(t, err) + testutil.Equals(t, expectedBlockDirs, actualBlockDirs) +} + +type mockCompactorFailing struct { + t *testing.T + blocks []*Block +} + +func (*mockCompactorFailing) Plan(dir string) ([]string, error) { + + return nil, nil +} +func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { + if len(c.blocks) > 0 { + return ulid.ULID{}, fmt.Errorf("the compactor already did one block so forcefuly failing") + } + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + meta := &BlockMeta{ + Version: 2, + ULID: uid, + } + + fmt.Println(dest) + block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + c.blocks = append(c.blocks, block) + return block.Meta().ULID, nil +} + +func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) { + return ulid.ULID{}, nil + +} + func TestDB_Retention(t *testing.T) { db, close := openTestDB(t, nil) defer close()