mirror of https://github.com/prometheus/prometheus
Support compaction of Head block for histograms (#9044)
* Update querier.go to support Head compaction with histograms Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add test for Head compaction with histograms Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>pull/9046/head
parent
4c01ff5194
commit
67871fd1f2
|
@ -25,10 +25,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
|
@ -1311,3 +1313,67 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadCompactionWithHistograms(t *testing.T) {
|
||||||
|
head, _ := newTestHead(t, DefaultBlockDuration, false)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, head.Init(0))
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
|
||||||
|
type timedHist struct {
|
||||||
|
t int64
|
||||||
|
h histogram.SparseHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ingest samples.
|
||||||
|
numHistograms := 120 * 4
|
||||||
|
timeStep := DefaultBlockDuration / int64(numHistograms)
|
||||||
|
expHists := make([]timedHist, 0, numHistograms)
|
||||||
|
l := labels.Labels{{Name: "a", Value: "b"}}
|
||||||
|
for i, h := range generateHistograms(numHistograms) {
|
||||||
|
_, err := app.AppendHistogram(0, l, int64(i)*timeStep, h)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expHists = append(expHists, timedHist{int64(i) * timeStep, h})
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
// Compaction.
|
||||||
|
mint := head.MinTime()
|
||||||
|
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
|
||||||
|
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, ulid.ULID{}, id)
|
||||||
|
|
||||||
|
// Open the block and query it and check the histograms.
|
||||||
|
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, block.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, q.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
|
||||||
|
require.True(t, ss.Next())
|
||||||
|
s := ss.At()
|
||||||
|
require.False(t, ss.Next())
|
||||||
|
|
||||||
|
it := s.Iterator()
|
||||||
|
actHists := make([]timedHist, 0, len(expHists))
|
||||||
|
for it.Next() {
|
||||||
|
t, h := it.AtHistogram()
|
||||||
|
actHists = append(actHists, timedHist{t, h.Copy()})
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, expHists, actHists)
|
||||||
|
}
|
||||||
|
|
|
@ -2205,6 +2205,9 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
|
|
||||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, q.Close())
|
||||||
|
})
|
||||||
|
|
||||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||||
|
|
||||||
|
|
|
@ -666,8 +666,18 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
|
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
|
||||||
newChunk := chunkenc.NewXORChunk()
|
var (
|
||||||
app, err := newChunk.Appender()
|
newChunk chunkenc.Chunk
|
||||||
|
app chunkenc.Appender
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS {
|
||||||
|
newChunk = chunkenc.NewHistoChunk()
|
||||||
|
app, err = newChunk.Appender()
|
||||||
|
} else {
|
||||||
|
newChunk = chunkenc.NewXORChunk()
|
||||||
|
app, err = newChunk.Appender()
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.err = err
|
p.err = err
|
||||||
return false
|
return false
|
||||||
|
@ -684,14 +694,29 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
t, v := p.currDelIter.At()
|
var (
|
||||||
|
t int64
|
||||||
|
v float64
|
||||||
|
h histogram.SparseHistogram
|
||||||
|
)
|
||||||
|
if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS {
|
||||||
|
t, h = p.currDelIter.AtHistogram()
|
||||||
|
p.curr.MinTime = t
|
||||||
|
app.AppendHistogram(t, h.Copy())
|
||||||
|
for p.currDelIter.Next() {
|
||||||
|
t, h = p.currDelIter.AtHistogram()
|
||||||
|
app.AppendHistogram(t, h.Copy())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t, v = p.currDelIter.At()
|
||||||
p.curr.MinTime = t
|
p.curr.MinTime = t
|
||||||
app.Append(t, v)
|
app.Append(t, v)
|
||||||
|
|
||||||
for p.currDelIter.Next() {
|
for p.currDelIter.Next() {
|
||||||
t, v = p.currDelIter.At()
|
t, v = p.currDelIter.At()
|
||||||
app.Append(t, v)
|
app.Append(t, v)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := p.currDelIter.Err(); err != nil {
|
if err := p.currDelIter.Err(); err != nil {
|
||||||
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
|
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
|
||||||
return false
|
return false
|
||||||
|
|
Loading…
Reference in New Issue