From 921f82cfc1e997c58201b8c5635b3f462346ecbd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 18 Jan 2019 18:58:17 +0200 Subject: [PATCH] WIP Signed-off-by: Krasi Georgiev --- block_test.go | 14 ++++++++++---- compact.go | 8 +++++++- compact_test.go | 10 ++++++---- db.go | 6 ++++-- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/block_test.go b/block_test.go index dec044918..0f164ae06 100644 --- a/block_test.go +++ b/block_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math/rand" "os" @@ -71,15 +72,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin testutil.Ok(tb, err) var ref uint64 + fmt.Println(len(lbls)) + for ts := mint; ts <= maxt; ts++ { app := head.Appender() - for _, lbl := range lbls { - err := app.AddFast(ref, ts, rand.Float64()) - if err == nil { - continue + for i, lbl := range lbls { + if i > 0 && lbl.String() == lbls[i-1].String() { + err := app.AddFast(ref, ts, rand.Float64()) + if err == nil { + continue + } } ref, err = app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) + } err := app.Commit() testutil.Ok(tb, err) diff --git a/compact.go b/compact.go index dbc3c2606..bf19199aa 100644 --- a/compact.go +++ b/compact.go @@ -530,7 +530,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write compaction") } - // Remove tmp folder and return early when the compaction was canceled. + // Compaction was canceled so remove tmp folders and return early. select { case <-c.ctx.Done(): for _, w := range writers { @@ -616,6 +616,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Inc() + + fmt.Println(blocks) for i, b := range blocks { select { case <-c.ctx.Done(): @@ -623,6 +625,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } + fmt.Println("next block") + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -684,6 +688,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil default: } + + // fmt.Println("next set") lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index 2080fb291..b84cdf8dc 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math" "os" @@ -752,9 +753,9 @@ func TestCancelCompactions(t *testing.T) { defer os.RemoveAll(tmpdir) // Create some blocks to fall within the compaction range. - createPopulatedBlock(t, tmpdir, 3000, 0, 1000) - createPopulatedBlock(t, tmpdir, 3000, 1000, 2000) - createPopulatedBlock(t, tmpdir, 1, 2000, 2001) + createBlock(t, tmpdir, 1000, 0, 1000) + createBlock(t, tmpdir, 1000, 1000, 2000) + createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) testutil.Ok(t, err) @@ -763,7 +764,8 @@ func TestCancelCompactions(t *testing.T) { dbClosed := make(chan struct{}) for { if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - time.Sleep(5 * time.Millisecond) + fmt.Println("populating started.") + time.Sleep(2 * time.Millisecond) go func() { testutil.Ok(t, db.Close()) close(dbClosed) diff --git a/db.go b/db.go index c9715483d..a580ea240 100644 --- a/db.go +++ b/db.go @@ -278,6 +278,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db ctx, cnl := context.WithCancel(context.Background()) db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { + cnl() return nil, errors.Wrap(err, "create leveled compactor") } db.compactCnl = cnl @@ -459,7 +460,6 @@ func (db *DB) compact() (err error) { return nil default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { return errors.Wrapf(err, "compact %s", plan) } @@ -819,7 +819,9 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) - db.compactCnl() + // fmt.Println("closing") + // db.compactCnl() + // fmt.Println("closed") <-db.donec db.mtx.Lock()