Add/Improve unit tests for compaction with histogram Part 2 (#11343)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/11312/head
Ganesh Vernekar 2022-09-23 14:01:10 +05:30 committed by GitHub
parent 2474c6fb2c
commit 758e29258b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 71 additions and 171 deletions

View File

@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
) )
func TestSplitByRange(t *testing.T) { func TestSplitByRange(t *testing.T) {
@ -1298,29 +1299,77 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
func TestHeadCompactionWithHistograms(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, false) head, _ := newTestHead(t, DefaultBlockDuration, false)
require.NoError(t, head.Init(0))
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
}) })
require.NoError(t, head.Init(0)) minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
app := head.Appender(context.Background()) ctx := context.Background()
appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
t.Helper()
app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ {
_, err := app.AppendHistogram(0, lbls, minute(tsMinute), h)
require.NoError(t, err)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
}
type timedHistogram struct { require.NoError(t, app.Commit())
t int64 }
h *histogram.Histogram appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) {
t.Helper()
app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ {
_, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute))
require.NoError(t, err)
*exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)})
}
require.NoError(t, app.Commit())
} }
// Ingest samples. var (
numHistograms := 120 * 4 series1 = labels.FromStrings("foo", "bar1")
timeStep := DefaultBlockDuration / int64(numHistograms) series2 = labels.FromStrings("foo", "bar2")
expHists := make([]timedHistogram, 0, numHistograms) series3 = labels.FromStrings("foo", "bar3")
l := labels.Labels{{Name: "a", Value: "b"}} series4 = labels.FromStrings("foo", "bar4")
for i, h := range GenerateTestHistograms(numHistograms) { exp1, exp2, exp3, exp4 []tsdbutil.Sample
_, err := app.AppendHistogram(0, l, int64(i)*timeStep, h) )
require.NoError(t, err) h := &histogram.Histogram{
expHists = append(expHists, timedHistogram{int64(i) * timeStep, h}) Count: 11,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{1, 2, -1},
} }
require.NoError(t, app.Commit())
// Series with only histograms.
appendHistogram(series1, 100, 105, h, &exp1)
// Series starting with float and then getting histograms.
appendFloat(series2, 100, 102, &exp2)
appendHistogram(series2, 103, 105, h.Copy(), &exp2)
appendFloat(series2, 106, 107, &exp2)
appendHistogram(series2, 108, 109, h.Copy(), &exp2)
// Series starting with histogram and then getting float.
appendHistogram(series3, 101, 103, h.Copy(), &exp3)
appendFloat(series3, 104, 106, &exp3)
appendHistogram(series3, 107, 108, h.Copy(), &exp3)
appendFloat(series3, 109, 110, &exp3)
// A float only series.
appendFloat(series4, 100, 102, &exp4)
// Compaction. // Compaction.
mint := head.MinTime() mint := head.MinTime()
@ -1340,25 +1389,14 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime()) q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, q.Close())
})
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, map[string][]tsdbutil.Sample{
require.True(t, ss.Next()) series1.String(): exp1,
s := ss.At() series2.String(): exp2,
require.False(t, ss.Next()) series3.String(): exp3,
series4.String(): exp4,
it := s.Iterator() }, actHists)
actHists := make([]timedHistogram, 0, len(expHists))
for it.Next() == chunkenc.ValHistogram {
// TODO(beorn7): Test mixed series?
t, h := it.AtHistogram()
actHists = append(actHists, timedHistogram{t, h})
}
require.Equal(t, expHists, actHists)
} }
// Depending on numSeriesPerSchema, it can take few gigs of memory; // Depending on numSeriesPerSchema, it can take few gigs of memory;
@ -1671,144 +1709,6 @@ func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, s
return r return r
} }
func TestSparseHistogramCompactionAndQuery(t *testing.T) {
dir := t.TempDir()
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dir))
})
opts := DefaultOptions()
opts.EnableNativeHistograms = true
// Exactly 3 times so that level 2 of compaction happens and tombstone
// deletion and compaction considers the level 2 blocks to be big enough.
opts.MaxBlockDuration = 3 * opts.MinBlockDuration
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
db.DisableCompactions()
type timedHistogram struct {
t int64
h *histogram.Histogram
}
expHists := make(map[string][]timedHistogram)
series1Histograms := GenerateTestHistograms(20)
series2Histograms := GenerateTestHistograms(20)
idx1, idx2 := -1, -1
addNextHists := func(ts int64, app storage.Appender) {
lbls1 := labels.Labels{{Name: "a", Value: "b"}}
lbls2 := labels.Labels{{Name: "a", Value: "c"}}
idx1++
_, err := app.AppendHistogram(0, lbls1, ts, series1Histograms[idx1])
require.NoError(t, err)
idx2++
_, err = app.AppendHistogram(0, lbls2, ts, series2Histograms[idx2])
require.NoError(t, err)
l1, l2 := lbls1.String(), lbls2.String()
expHists[l1] = append(expHists[l1], timedHistogram{t: ts, h: series1Histograms[idx1]})
expHists[l2] = append(expHists[l2], timedHistogram{t: ts, h: series2Histograms[idx2]})
}
testQuery := func() {
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer func() {
require.NoError(t, q.Close())
}()
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
actHists := make(map[string][]timedHistogram)
for ss.Next() {
s := ss.At()
it := s.Iterator()
for it.Next() == chunkenc.ValHistogram {
ts, h := it.AtHistogram()
actHists[s.Labels().String()] = append(actHists[s.Labels().String()], timedHistogram{ts, h.Copy()})
}
require.NoError(t, it.Err())
}
require.NoError(t, ss.Err())
require.Equal(t, expHists, actHists)
}
// Add histograms to create 1 block via compaction.
app := db.Appender(context.Background())
for ts := int64(0); ts <= 2*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
addNextHists(ts, app)
}
require.NoError(t, app.Commit())
testQuery() // Only the head block.
require.NoError(t, db.Compact())
require.Equal(t, 1, len(db.Blocks()))
testQuery() // 1 persistent block and the head block.
// Add histograms to create 2 more blocks via compaction.
app = db.Appender(context.Background())
for ts := 5 * DefaultBlockDuration / 2; ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
addNextHists(ts, app)
}
require.NoError(t, app.Commit())
require.NoError(t, db.Compact())
require.Equal(t, 3, len(db.Blocks()))
testQuery() // >1 persistent block (and the head block).
// Another block triggers compaction of the first 3 blocks into 1 block.
app = db.Appender(context.Background())
for ts := 9 * DefaultBlockDuration / 2; ts <= 5*DefaultBlockDuration; ts += DefaultBlockDuration / 2 {
addNextHists(ts, app)
}
require.NoError(t, app.Commit())
require.NoError(t, db.Compact())
require.Equal(t, 2, len(db.Blocks()))
testQuery()
require.Equal(t, int64(0), db.blocks[0].MinTime())
require.Equal(t, 3*DefaultBlockDuration, db.blocks[0].MaxTime())
require.Equal(t, 3*DefaultBlockDuration, db.blocks[1].MinTime())
require.Equal(t, 4*DefaultBlockDuration, db.blocks[1].MaxTime())
// Add tombstones to the first block to make sure that the deletion works for histograms on compaction.
delTime := 2 * DefaultBlockDuration
err = db.Delete(0, delTime, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
require.NoError(t, err)
require.Equal(t, uint64(2), db.blocks[0].Meta().Stats.NumTombstones)
// Truncate expected histograms to test the query after deletion.
for k, v := range expHists {
oldCount := len(v)
for i := 0; i < len(v); i++ {
if v[i].t > delTime {
expHists[k] = expHists[k][i:]
break
}
}
require.Less(t, len(expHists[k]), oldCount)
require.Greater(t, len(expHists[k]), 0)
}
testQuery() // Query with tombstones on persistent block.
oldULID := db.blocks[0].Meta().ULID
require.NoError(t, db.Compact())
require.Equal(t, 2, len(db.Blocks()))
newULID := db.blocks[0].Meta().ULID
require.NotEqual(t, oldULID, newULID)
require.Equal(t, uint64(0), db.blocks[0].Meta().Stats.NumTombstones)
testQuery()
// Adding tombstones to head and testing query for that.
// Last sample was ts=5*DefaultBlockDuration, so a tombstone just to cover that.
err = db.Delete((5*DefaultBlockDuration)-1, (5*DefaultBlockDuration)+1, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*"))
require.NoError(t, err)
// Remove last sample from expected.
for k := range expHists {
expHists[k] = expHists[k][:len(expHists[k])-1]
require.Greater(t, len(expHists[k]), 0)
}
testQuery()
}
func TestCompactBlockMetas(t *testing.T) { func TestCompactBlockMetas(t *testing.T) {
parent1 := ulid.MustNew(100, nil) parent1 := ulid.MustNew(100, nil)
parent2 := ulid.MustNew(200, nil) parent2 := ulid.MustNew(200, nil)