From 0b70333ef6d5588e0ad1816dc1d698d5a6f19419 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Sun, 21 May 2017 23:20:05 +0530 Subject: [PATCH] Add tests for tombstones. Signed-off-by: Goutham Veeramachaneni --- block.go | 5 ++- compact.go | 2 +- db.go | 6 +-- tombstones.go | 68 ++++++++++++++++------------- tombstones_test.go | 105 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 149 insertions(+), 37 deletions(-) diff --git a/block.go b/block.go index 69c351ccc..03a2e569a 100644 --- a/block.go +++ b/block.go @@ -198,6 +198,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { chunkr: cr, indexr: ir, + // TODO(gouthamve): We will be sorting the refs again internally, is it a big deal? tombstones: newMapTombstoneReader(ts), } return pb, nil @@ -222,7 +223,7 @@ func (pb *persistedBlock) Querier(mint, maxt int64) Querier { maxt: maxt, index: pb.Index(), chunks: pb.Chunks(), - tombstones: pb.tombstones.Copy(), + tombstones: pb.Tombstones(), } } @@ -269,7 +270,7 @@ Outer: } // Merge the current and new tombstones. - tr := pb.tombstones.Copy() + tr := pb.Tombstones() str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}}) tombreader := newMergedTombstoneReader(tr, str) diff --git a/compact.go b/compact.go index 7ba22f5f7..8419f1e06 100644 --- a/compact.go +++ b/compact.go @@ -424,7 +424,7 @@ func (c *compactionSeriesSet) Next() bool { return false } - // Remove completely deleted chunks and re-encode partial ones. + // Remove completely deleted chunks. if len(c.dranges) > 0 { chks := make([]*ChunkMeta, 0, len(c.c)) for _, chk := range c.c { diff --git a/db.go b/db.go index b3b055844..28506498f 100644 --- a/db.go +++ b/db.go @@ -680,9 +680,9 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() - s.headmtx.RLock() - blocks := s.blocksForInterval(mint, maxt) - s.headmtx.RUnlock() + db.headmtx.RLock() + blocks := db.blocksForInterval(mint, maxt) + db.headmtx.RUnlock() var g errgroup.Group diff --git a/tombstones.go b/tombstones.go index d8182c706..b0d8eca41 100644 --- a/tombstones.go +++ b/tombstones.go @@ -102,15 +102,15 @@ type TombstoneReader interface { Next() bool Seek(ref uint32) bool At() stone - // A copy of the current instance. Changes to the copy will not affect parent. + // Copy copies the current reader state. Changes to the copy will not affect parent. Copy() TombstoneReader Err() error } type tombstoneReader struct { stones []byte - idx int - len int + + cur stone b []byte err error @@ -135,11 +135,10 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) { if err := d.err(); err != nil { return nil, err } + off += 4 // For the numStones which has been read. return &tombstoneReader{ - stones: b[off+4:], - idx: -1, - len: int(numStones), + stones: b[off : off+int64(numStones*12)], b: b, }, nil @@ -150,26 +149,11 @@ func (t *tombstoneReader) Next() bool { return false } - t.idx++ + if len(t.stones) < 12 { + return false + } - return t.idx < t.len -} - -func (t *tombstoneReader) Seek(ref uint32) bool { - bytIdx := t.idx * 12 - - t.idx += sort.Search(t.len-t.idx, func(i int) bool { - return binary.BigEndian.Uint32(t.b[bytIdx+i*12:]) >= ref - }) - - return t.idx < t.len -} - -func (t *tombstoneReader) At() stone { - bytIdx := t.idx * (4 + 8) - dat := t.stones[bytIdx : bytIdx+12] - - d := &decbuf{b: dat} + d := &decbuf{b: t.stones[:12]} ref := d.be32() off := d.be64int64() @@ -177,7 +161,7 @@ func (t *tombstoneReader) At() stone { numRanges := d.varint64() if err := d.err(); err != nil { t.err = err - return stone{ref: ref} + return false } dranges := make([]trange, 0, numRanges) @@ -186,20 +170,40 @@ func (t *tombstoneReader) At() stone { maxt := d.varint64() if err := d.err(); err != nil { t.err = err - return stone{ref: ref, ranges: dranges} + return false } dranges = append(dranges, trange{mint, maxt}) } - return stone{ref: ref, ranges: dranges} + t.stones = t.stones[12:] + t.cur = stone{ref: ref, ranges: dranges} + return true +} + +func (t *tombstoneReader) Seek(ref uint32) bool { + i := sort.Search(len(t.stones)/12, func(i int) bool { + x := binary.BigEndian.Uint32(t.stones[i*12:]) + return x >= ref + }) + + if i*12 < len(t.stones) { + t.stones = t.stones[i*12:] + return t.Next() + } + + t.stones = nil + return false +} + +func (t *tombstoneReader) At() stone { + return t.cur } func (t *tombstoneReader) Copy() TombstoneReader { return &tombstoneReader{ stones: t.stones[:], - idx: t.idx, - len: t.len, + cur: t.cur, b: t.b, } @@ -291,6 +295,7 @@ func newSimpleTombstoneReader(refs []uint32, drange []trange) *simpleTombstoneRe func (t *simpleTombstoneReader) Next() bool { if len(t.refs) > 0 { t.cur = t.refs[0] + t.refs = t.refs[1:] return true } @@ -437,7 +442,8 @@ func (tr trange) isSubrange(ranges []trange) bool { } // This adds the new time-range to the existing ones. -// The existing ones must be sorted and should not be nil. +// The existing ones must be sorted. +// TODO(gouthamve): {1, 2}, {3, 4} can be merged into {1, 4}. func addNewInterval(existing []trange, n trange) []trange { for i, r := range existing { // TODO(gouthamve): Make this codepath easier to digest. diff --git a/tombstones_test.go b/tombstones_test.go index 8a3460a95..1506d74de 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "math/rand" "os" + "sort" "testing" "time" @@ -94,3 +95,107 @@ func TestAddingNewIntervals(t *testing.T) { } return } + +func TestTombstoneReadersSeek(t *testing.T) { + // This is assuming that the listPostings is perfect. + table := struct { + m map[uint32][]trange + + cases []uint32 + }{ + m: map[uint32][]trange{ + 2: []trange{{1, 2}}, + 3: []trange{{1, 4}, {5, 6}}, + 4: []trange{{10, 15}, {16, 20}}, + 5: []trange{{1, 4}, {5, 6}}, + 50: []trange{{10, 20}, {35, 50}}, + 600: []trange{{100, 2000}}, + 1000: []trange{}, + 1500: []trange{{10000, 500000}}, + 1600: []trange{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, + }, + + cases: []uint32{1, 10, 20, 40, 30, 20, 50, 599, 601, 1000, 1600, 1601, 2000}, + } + + testFunc := func(t *testing.T, tr TombstoneReader) { + for _, ref := range table.cases { + // Create the listPostings. + refs := make([]uint32, 0, len(table.m)) + for k := range table.m { + refs = append(refs, k) + } + sort.Sort(uint32slice(refs)) + pr := newListPostings(refs) + + // Compare both. + trc := tr.Copy() + require.Equal(t, pr.Seek(ref), trc.Seek(ref)) + if pr.Seek(ref) { + require.Equal(t, pr.At(), trc.At().ref) + require.Equal(t, table.m[pr.At()], trc.At().ranges) + } + + for pr.Next() { + require.True(t, trc.Next()) + require.Equal(t, pr.At(), trc.At().ref) + require.Equal(t, table.m[pr.At()], trc.At().ranges) + } + + require.False(t, trc.Next()) + require.NoError(t, pr.Err()) + require.NoError(t, tr.Err()) + } + } + + t.Run("tombstoneReader", func(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + mtr := newMapTombstoneReader(table.m) + writeTombstoneFile(tmpdir, mtr) + tr, err := readTombstoneFile(tmpdir) + require.NoError(t, err) + + testFunc(t, tr) + return + }) + t.Run("mapTombstoneReader", func(t *testing.T) { + mtr := newMapTombstoneReader(table.m) + + testFunc(t, mtr) + return + }) + t.Run("simpleTombstoneReader", func(t *testing.T) { + ranges := []trange{{1, 2}, {3, 4}, {5, 6}} + + for _, ref := range table.cases { + // Create the listPostings. + refs := make([]uint32, 0, len(table.m)) + for k := range table.m { + refs = append(refs, k) + } + sort.Sort(uint32slice(refs)) + pr := newListPostings(refs[:]) + tr := newSimpleTombstoneReader(refs[:], ranges) + + // Compare both. + trc := tr.Copy() + require.Equal(t, pr.Seek(ref), trc.Seek(ref)) + if pr.Seek(ref) { + require.Equal(t, pr.At(), trc.At().ref) + require.Equal(t, ranges, tr.At().ranges) + } + for pr.Next() { + require.True(t, trc.Next()) + require.Equal(t, pr.At(), trc.At().ref, "refs") + require.Equal(t, ranges, trc.At().ranges) + } + + require.False(t, trc.Next()) + require.NoError(t, pr.Err()) + require.NoError(t, tr.Err()) + } + return + }) +}