Browse Source

WIP

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 6 years ago
parent
commit
921f82cfc1
  1. 14
      block_test.go
  2. 8
      compact.go
  3. 10
      compact_test.go
  4. 6
      db.go

14
block_test.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
@ -71,15 +72,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin
testutil.Ok(tb, err)
var ref uint64
fmt.Println(len(lbls))
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
for _, lbl := range lbls {
err := app.AddFast(ref, ts, rand.Float64())
if err == nil {
continue
for i, lbl := range lbls {
if i > 0 && lbl.String() == lbls[i-1].String() {
err := app.AddFast(ref, ts, rand.Float64())
if err == nil {
continue
}
}
ref, err = app.Add(lbl, int64(ts), rand.Float64())
testutil.Ok(tb, err)
}
err := app.Commit()
testutil.Ok(tb, err)

8
compact.go

@ -530,7 +530,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "write compaction")
}
// Remove tmp folder and return early when the compaction was canceled.
// Compaction was canceled so remove tmp folders and return early.
select {
case <-c.ctx.Done():
for _, w := range writers {
@ -616,6 +616,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}()
c.metrics.populatingBlocks.Inc()
fmt.Println(blocks)
for i, b := range blocks {
select {
case <-c.ctx.Done():
@ -623,6 +625,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}
fmt.Println("next block")
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
@ -684,6 +688,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
default:
}
// fmt.Println("next set")
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// Skip the series with all deleted chunks.

10
compact_test.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
@ -752,9 +753,9 @@ func TestCancelCompactions(t *testing.T) {
defer os.RemoveAll(tmpdir)
// Create some blocks to fall within the compaction range.
createPopulatedBlock(t, tmpdir, 3000, 0, 1000)
createPopulatedBlock(t, tmpdir, 3000, 1000, 2000)
createPopulatedBlock(t, tmpdir, 1, 2000, 2001)
createBlock(t, tmpdir, 1000, 0, 1000)
createBlock(t, tmpdir, 1000, 1000, 2000)
createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one.
db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}})
testutil.Ok(t, err)
@ -763,7 +764,8 @@ func TestCancelCompactions(t *testing.T) {
dbClosed := make(chan struct{})
for {
if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 {
time.Sleep(5 * time.Millisecond)
fmt.Println("populating started.")
time.Sleep(2 * time.Millisecond)
go func() {
testutil.Ok(t, db.Close())
close(dbClosed)

6
db.go

@ -278,6 +278,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
ctx, cnl := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool)
if err != nil {
cnl()
return nil, errors.Wrap(err, "create leveled compactor")
}
db.compactCnl = cnl
@ -459,7 +460,6 @@ func (db *DB) compact() (err error) {
return nil
default:
}
if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil {
return errors.Wrapf(err, "compact %s", plan)
}
@ -819,7 +819,9 @@ func (db *DB) Head() *Head {
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
db.compactCnl()
// fmt.Println("closing")
// db.compactCnl()
// fmt.Println("closed")
<-db.donec
db.mtx.Lock()

Loading…
Cancel
Save