Add tests for tombstones and deletedIterator

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
pull/5805/head
Goutham Veeramachaneni 2017-05-16 12:43:33 +05:30
parent 4f1d857590
commit 3de55171d3
No known key found for this signature in database
GPG Key ID: F1C217E8E9023CAD
4 changed files with 211 additions and 44 deletions

View File

@ -199,10 +199,17 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
pos += int64(n)
}
}
if err := tr.Err(); err != nil {
return err
}
// Write the offset table.
buf.reset()
buf.putBE32int(len(refs))
if _, err := f.Write(buf.get()); err != nil {
return err
}
for _, ref := range refs {
buf.reset()
buf.putBE32(ref)
@ -325,7 +332,7 @@ Outer:
// Merge the current and new tombstones.
tr := newMapTombstoneReader(ir.tombstones)
str := newSimpleTombstoneReader(vPostings, []trange{mint, maxt})
str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}})
tombreader := newMergedTombstoneReader(tr, str)
return writeTombstoneFile(pb.dir, tombreader)
@ -371,13 +378,13 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) {
}
d = &decbuf{b: b[off:]}
numStones := d.be64int64()
numStones := d.be32int()
if err := d.err(); err != nil {
return nil, err
}
return &tombstoneReader{
stones: b[off+8 : (off+8)+(numStones*12)],
stones: b[off+4:],
idx: -1,
len: int(numStones),
@ -448,6 +455,7 @@ func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader {
func (t *mapTombstoneReader) Next() bool {
if len(t.refs) > 0 {
t.cur = t.refs[0]
t.refs = t.refs[1:]
return true
}

47
block_test.go Normal file
View File

@ -0,0 +1,47 @@
package tsdb
import (
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestWriteAndReadbackTombStones(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
ref := uint32(0)
stones := make(map[uint32][]trange)
// Generate the tombstones.
for i := 0; i < 100; i++ {
ref += uint32(rand.Int31n(10)) + 1
numRanges := rand.Intn(5)
dranges := make([]trange, numRanges)
mint := rand.Int63n(time.Now().UnixNano())
for j := 0; j < numRanges; j++ {
dranges[j] = trange{mint, mint + rand.Int63n(1000)}
mint += rand.Int63n(1000) + 1
}
stones[ref] = dranges
}
require.NoError(t, writeTombstoneFile(tmpdir, newMapTombstoneReader(stones)))
restr, err := readTombstoneFile(tmpdir)
require.NoError(t, err)
exptr := newMapTombstoneReader(stones)
// Compare the two readers.
for restr.Next() {
require.True(t, exptr.Next())
require.Equal(t, exptr.At(), restr.At())
}
require.False(t, exptr.Next())
require.NoError(t, restr.Err())
require.NoError(t, exptr.Err())
}

View File

@ -47,46 +47,6 @@ type ChunkMeta struct {
dranges []trange
}
type trange struct {
mint, maxt int64
}
func (tr trange) inBounds(t int64) bool {
return t >= tr.mint && t <= tr.maxt
}
// This adds the new time-range to the existing ones.
// The existing ones must be sorted.
func addNewInterval(existing []trange, n trange) []trange {
for i, r := range existing {
if r.inBounds(n.mint) {
if n.maxt > r.maxt {
existing[i].maxt = n.maxt
}
return existing
}
if r.inBounds(n.maxt) {
if n.mint < r.maxt {
existing[i].mint = n.mint
}
return existing
}
if n.mint < r.mint {
newRange := existing[:i]
newRange = append(newRange, n)
newRange = append(newRange, existing[i:]...)
return newRange
}
}
existing = append(existing, n)
return existing
}
// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *ChunkMeta) writeHash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
@ -112,6 +72,47 @@ func (cm *ChunkMeta) Iterator() chunks.Iterator {
return cm.Chunk.Iterator()
}
type trange struct {
mint, maxt int64
}
func (tr trange) inBounds(t int64) bool {
return t >= tr.mint && t <= tr.maxt
}
// This adds the new time-range to the existing ones.
// The existing ones must be sorted and should not be nil.
func addNewInterval(existing []trange, n trange) []trange {
for i, r := range existing {
if r.inBounds(n.mint) {
if n.maxt > r.maxt {
existing[i].maxt = n.maxt
}
return existing
}
if r.inBounds(n.maxt) {
if n.mint < r.maxt {
existing[i].mint = n.mint
}
return existing
}
if n.mint < r.mint {
newRange := make([]trange, i, len(existing[:i])+1)
copy(newRange, existing[:i])
newRange = append(newRange, n)
newRange = append(newRange, existing[i:]...)
return newRange
}
}
existing = append(existing, n)
return existing
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
@ -128,10 +129,12 @@ func (it *deletedIterator) Next() bool {
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for _, tr := range it.dranges {
if tr.inBounds(ts) {
continue Outer
}
if ts > tr.maxt {
it.dranges = it.dranges[1:]
continue
@ -147,7 +150,7 @@ Outer:
}
func (it *deletedIterator) Err() error {
return it.Err()
return it.it.Err()
}
// ChunkWriter serializes a time block of chunked series data.

View File

@ -14,8 +14,12 @@
package tsdb
import (
"math/rand"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/stretchr/testify/require"
)
type mockChunkReader map[uint64]chunks.Chunk
@ -32,3 +36,108 @@ func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
func (cr mockChunkReader) Close() error {
return nil
}
func TestAddingNewIntervals(t *testing.T) {
cases := []struct {
exist []trange
new trange
exp []trange
}{
{
new: trange{1, 2},
exp: []trange{{1, 2}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 23},
exp: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{21, 25},
exp: []trange{{1, 10}, {12, 20}, {21, 30}},
},
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{18, 23},
exp: []trange{{1, 10}, {12, 23}, {25, 30}},
},
// TODO(gouthamve): (below) This is technically right, but fix it in the future.
{
exist: []trange{{1, 10}, {12, 20}, {25, 30}},
new: trange{9, 23},
exp: []trange{{1, 23}, {12, 20}, {25, 30}},
},
{
exist: []trange{{5, 10}, {12, 20}, {25, 30}},
new: trange{1, 4},
exp: []trange{{1, 4}, {5, 10}, {12, 20}, {25, 30}},
},
}
for _, c := range cases {
require.Equal(t, c.exp, addNewInterval(c.exist, c.new))
}
return
}
func TestDeletedIterator(t *testing.T) {
chk := chunks.NewXORChunk()
app, err := chk.Appender()
require.NoError(t, err)
// Insert random stuff from (0, 1000).
act := make([]sample, 1000)
for i := 0; i < 1000; i++ {
act[i].t = int64(i)
act[i].v = rand.Float64()
app.Append(act[i].t, act[i].v)
}
cases := []struct {
r []trange
}{
{r: []trange{{1, 20}}},
{r: []trange{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
{r: []trange{{1, 10}, {12, 20}, {20, 30}}},
{r: []trange{{1, 10}, {12, 23}, {25, 30}}},
{r: []trange{{1, 23}, {12, 20}, {25, 30}}},
{r: []trange{{1, 23}, {12, 20}, {25, 3000}}},
{r: []trange{{0, 2000}}},
{r: []trange{{500, 2000}}},
{r: []trange{{0, 200}}},
{r: []trange{{1000, 20000}}},
}
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(), dranges: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.maxt + 1
ranges = ranges[1:]
}
}
require.True(t, i < 1000)
ts, v := it.At()
require.Equal(t, act[i].t, ts)
require.Equal(t, act[i].v, v)
}
// There has been an extra call to Next().
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.maxt + 1
ranges = ranges[1:]
}
}
require.False(t, i < 1000)
require.NoError(t, it.Err())
}
}