Add support for histograms in WAL checkpointing (#11210)

* Add support for histograms in WAL checkpointing

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix review comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/11227/head
Ganesh Vernekar 2022-08-29 17:38:36 +05:30 committed by GitHub
parent 6383994f3e
commit f540c1dbd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 142 additions and 82 deletions

View File

@ -663,7 +663,7 @@ outer:
return true return true
} }
func (t *QueueManager) AppendHistograms(histograms []record.RefHistogram) bool { func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool {
if !t.sendNativeHistograms { if !t.sendNativeHistograms {
return true return true
} }

View File

@ -104,7 +104,7 @@ func TestSampleDelivery(t *testing.T) {
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogram histograms []record.RefHistogramSample
) )
// Generates same series in both cases. // Generates same series in both cases.
@ -583,13 +583,13 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor
return exemplars, series return exemplars, series
} }
func createHistograms(numSamples, numSeries int) ([]record.RefHistogram, []record.RefSeries) { func createHistograms(numSamples, numSeries int) ([]record.RefHistogramSample, []record.RefSeries) {
histograms := make([]record.RefHistogram, 0, numSamples) histograms := make([]record.RefHistogramSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries) series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < numSeries; i++ { for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i) name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ { for j := 0; j < numSamples; j++ {
h := record.RefHistogram{ h := record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i), Ref: chunks.HeadSeriesRef(i),
T: int64(j), T: int64(j),
H: &histogram.Histogram{ H: &histogram.Histogram{
@ -689,7 +689,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
c.wg.Add(len(ss)) c.wg.Add(len(ss))
} }
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogram, series []record.RefSeries) { func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }

View File

@ -50,7 +50,7 @@ var (
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "histograms_in_total", Name: "histograms_in_total",
Help: "Histograms in to remote storage, compare to histograms out for queue managers.", Help: "HistogramSamples in to remote storage, compare to histograms out for queue managers.",
}) })
) )

View File

@ -220,15 +220,15 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
h.exemplarsPool.Put(b[:0]) h.exemplarsPool.Put(b[:0])
} }
func (h *Head) getHistogramBuffer() []record.RefHistogram { func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
b := h.histogramsPool.Get() b := h.histogramsPool.Get()
if b == nil { if b == nil {
return make([]record.RefHistogram, 0, 512) return make([]record.RefHistogramSample, 0, 512)
} }
return b.([]record.RefHistogram) return b.([]record.RefHistogramSample)
} }
func (h *Head) putHistogramBuffer(b []record.RefHistogram) { func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.histogramsPool.Put(b[:0]) h.histogramsPool.Put(b[:0])
} }
@ -282,14 +282,14 @@ type headAppender struct {
minValidTime int64 // No samples below this timestamp are allowed. minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64 mint, maxt int64
series []record.RefSeries // New series held by this appender. series []record.RefSeries // New series held by this appender.
samples []record.RefSample // New float samples held by this appender. samples []record.RefSample // New float samples held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender. exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histograms []record.RefHistogram // New histogram samples held by this appender. histograms []record.RefHistogramSample // New histogram samples held by this appender.
histogramSeries []*memSeries // Histogram series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadata []record.RefMetadata // New metadata held by this appender. metadata []record.RefMetadata // New metadata held by this appender.
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
appendID, cleanupAppendIDsBelow uint64 appendID, cleanupAppendIDsBelow uint64
closed bool closed bool
@ -493,7 +493,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
a.maxt = t a.maxt = t
} }
a.histograms = append(a.histograms, record.RefHistogram{ a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref, Ref: s.ref,
T: t, T: t,
H: h, H: h,
@ -659,7 +659,7 @@ func (a *headAppender) log() error {
} }
} }
if len(a.histograms) > 0 { if len(a.histograms) > 0 {
rec = enc.Histograms(a.histograms, buf) rec = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0] buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil { if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log histograms") return errors.Wrap(err, "log histograms")

View File

@ -59,7 +59,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
dec record.Decoder dec record.Decoder
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, n)
histogramShards = make([][]record.RefHistogram, n) histogramShards = make([][]record.RefHistogramSample, n)
decoded = make(chan interface{}, 10) decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error decodeErr, seriesCreationErr error
@ -85,7 +85,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
} }
histogramsPool = sync.Pool{ histogramsPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return []record.RefHistogram{} return []record.RefHistogramSample{}
}, },
} }
metadataPool = sync.Pool{ metadataPool = sync.Pool{
@ -197,9 +197,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return return
} }
decoded <- exemplars decoded <- exemplars
case record.Histograms: case record.HistogramSamples:
hists := histogramsPool.Get().([]record.RefHistogram)[:0] hists := histogramsPool.Get().([]record.RefHistogramSample)[:0]
hists, err = dec.Histograms(rec, hists) hists, err = dec.HistogramSamples(rec, hists)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wal.CorruptionErr{
Err: errors.Wrap(err, "decode histograms"), Err: errors.Wrap(err, "decode histograms"),
@ -344,7 +344,7 @@ Outer:
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool.Put(v) exemplarsPool.Put(v)
case []record.RefHistogram: case []record.RefHistogramSample:
samples := v samples := v
// We split up the samples into chunks of 5000 samples or less. // We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
@ -450,15 +450,15 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedCh
type walSubsetProcessor struct { type walSubsetProcessor struct {
mx sync.Mutex // Take this lock while modifying series in the subset. mx sync.Mutex // Take this lock while modifying series in the subset.
input chan interface{} // Either []record.RefSample or []record.RefHistogram. input chan interface{} // Either []record.RefSample or []record.RefHistogramSample.
output chan []record.RefSample output chan []record.RefSample
histogramsOutput chan []record.RefHistogram histogramsOutput chan []record.RefHistogramSample
} }
func (wp *walSubsetProcessor) setup() { func (wp *walSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300) wp.output = make(chan []record.RefSample, 300)
wp.input = make(chan interface{}, 300) wp.input = make(chan interface{}, 300)
wp.histogramsOutput = make(chan []record.RefHistogram, 300) wp.histogramsOutput = make(chan []record.RefHistogramSample, 300)
} }
func (wp *walSubsetProcessor) closeAndDrain() { func (wp *walSubsetProcessor) closeAndDrain() {
@ -480,7 +480,7 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
} }
// If there is a buffer in the output chan, return it for reuse, otherwise return nil. // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogram { func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogramSample {
select { select {
case buf := <-wp.histogramsOutput: case buf := <-wp.histogramsOutput:
return buf[:0] return buf[:0]
@ -528,7 +528,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs, unknownHi
} }
} }
wp.output <- samples wp.output <- samples
case []record.RefHistogram: case []record.RefHistogramSample:
for _, s := range samples { for _, s := range samples {
if s.T < minValidTime { if s.T < minValidTime {
continue continue

View File

@ -47,7 +47,7 @@ const (
// Metadata is used to match WAL records of type Metadata. // Metadata is used to match WAL records of type Metadata.
Metadata Type = 6 Metadata Type = 6
// Histograms is used to match WAL records of type Histograms. // Histograms is used to match WAL records of type Histograms.
Histograms Type = 7 HistogramSamples Type = 7
) )
func (rt Type) String() string { func (rt Type) String() string {
@ -60,8 +60,8 @@ func (rt Type) String() string {
return "tombstones" return "tombstones"
case Exemplars: case Exemplars:
return "exemplars" return "exemplars"
case Histograms: case HistogramSamples:
return "histograms" return "histogram_samples"
case Metadata: case Metadata:
return "metadata" return "metadata"
default: default:
@ -73,14 +73,14 @@ func (rt Type) String() string {
type MetricType uint8 type MetricType uint8
const ( const (
UnknownMT MetricType = 0 UnknownMT MetricType = 0
Counter MetricType = 1 Counter MetricType = 1
Gauge MetricType = 2 Gauge MetricType = 2
Histogram MetricType = 3 HistogramSample MetricType = 3
GaugeHistogram MetricType = 4 GaugeHistogram MetricType = 4
Summary MetricType = 5 Summary MetricType = 5
Info MetricType = 6 Info MetricType = 6
Stateset MetricType = 7 Stateset MetricType = 7
) )
func GetMetricType(t textparse.MetricType) uint8 { func GetMetricType(t textparse.MetricType) uint8 {
@ -90,7 +90,7 @@ func GetMetricType(t textparse.MetricType) uint8 {
case textparse.MetricTypeGauge: case textparse.MetricTypeGauge:
return uint8(Gauge) return uint8(Gauge)
case textparse.MetricTypeHistogram: case textparse.MetricTypeHistogram:
return uint8(Histogram) return uint8(HistogramSample)
case textparse.MetricTypeGaugeHistogram: case textparse.MetricTypeGaugeHistogram:
return uint8(GaugeHistogram) return uint8(GaugeHistogram)
case textparse.MetricTypeSummary: case textparse.MetricTypeSummary:
@ -110,7 +110,7 @@ func ToTextparseMetricType(m uint8) textparse.MetricType {
return textparse.MetricTypeCounter return textparse.MetricTypeCounter
case uint8(Gauge): case uint8(Gauge):
return textparse.MetricTypeGauge return textparse.MetricTypeGauge
case uint8(Histogram): case uint8(HistogramSample):
return textparse.MetricTypeHistogram return textparse.MetricTypeHistogram
case uint8(GaugeHistogram): case uint8(GaugeHistogram):
return textparse.MetricTypeGaugeHistogram return textparse.MetricTypeGaugeHistogram
@ -140,7 +140,7 @@ type RefSeries struct {
} }
// RefSample is a timestamp/value pair associated with a reference to a series. // RefSample is a timestamp/value pair associated with a reference to a series.
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogram. // TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
type RefSample struct { type RefSample struct {
Ref chunks.HeadSeriesRef Ref chunks.HeadSeriesRef
T int64 T int64
@ -163,8 +163,8 @@ type RefExemplar struct {
Labels labels.Labels Labels labels.Labels
} }
// RefHistogram is a histogram. // RefHistogramSample is a histogram.
type RefHistogram struct { type RefHistogramSample struct {
Ref chunks.HeadSeriesRef Ref chunks.HeadSeriesRef
T int64 T int64
H *histogram.Histogram H *histogram.Histogram
@ -181,7 +181,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown return Unknown
} }
switch t := Type(rec[0]); t { switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars, Histograms, Metadata: case Series, Samples, Tombstones, Exemplars, HistogramSamples, Metadata:
return t return t
} }
return Unknown return Unknown
@ -367,10 +367,10 @@ func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemp
return exemplars, nil return exemplars, nil
} }
func (d *Decoder) Histograms(rec []byte, histograms []RefHistogram) ([]RefHistogram, error) { func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) {
dec := encoding.Decbuf{B: rec} dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte()) t := Type(dec.Byte())
if t != Histograms { if t != HistogramSamples {
return nil, errors.New("invalid record type") return nil, errors.New("invalid record type")
} }
if dec.Len() == 0 { if dec.Len() == 0 {
@ -384,7 +384,7 @@ func (d *Decoder) Histograms(rec []byte, histograms []RefHistogram) ([]RefHistog
dref := dec.Varint64() dref := dec.Varint64()
dtime := dec.Varint64() dtime := dec.Varint64()
rh := RefHistogram{ rh := RefHistogramSample{
Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)),
T: baseTime + dtime, T: baseTime + dtime,
H: &histogram.Histogram{ H: &histogram.Histogram{
@ -563,9 +563,9 @@ func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encodi
} }
} }
func (e *Encoder) Histograms(histograms []RefHistogram, b []byte) []byte { func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b} buf := encoding.Encbuf{B: b}
buf.PutByte(byte(Histograms)) buf.PutByte(byte(HistogramSamples))
if len(histograms) == 0 { if len(histograms) == 0 {
return buf.Get() return buf.Get()

View File

@ -110,7 +110,7 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, exemplars, decExemplars) require.Equal(t, exemplars, decExemplars)
histograms := []RefHistogram{ histograms := []RefHistogramSample{
{ {
Ref: 56, Ref: 56,
T: 1234, T: 1234,
@ -150,7 +150,7 @@ func TestRecord_EncodeDecode(t *testing.T) {
}, },
} }
decHistograms, err := dec.Histograms(enc.Histograms(histograms, nil), nil) decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, histograms, decHistograms) require.Equal(t, histograms, decHistograms)
} }
@ -218,7 +218,7 @@ func TestRecord_Corrupted(t *testing.T) {
}) })
t.Run("Test corrupted histogram record", func(t *testing.T) { t.Run("Test corrupted histogram record", func(t *testing.T) {
histograms := []RefHistogram{ histograms := []RefHistogramSample{
{ {
Ref: 56, Ref: 56,
T: 1234, T: 1234,
@ -237,8 +237,8 @@ func TestRecord_Corrupted(t *testing.T) {
}, },
} }
corrupted := enc.Histograms(histograms, nil)[:8] corrupted := enc.HistogramSamples(histograms, nil)[:8]
_, err := dec.Histograms(corrupted, nil) _, err := dec.HistogramSamples(corrupted, nil)
require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize) require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize)
}) })
} }
@ -263,7 +263,7 @@ func TestRecord_Type(t *testing.T) {
recordType = dec.Type(enc.Metadata(metadata, nil)) recordType = dec.Type(enc.Metadata(metadata, nil))
require.Equal(t, Metadata, recordType) require.Equal(t, Metadata, recordType)
histograms := []RefHistogram{ histograms := []RefHistogramSample{
{ {
Ref: 56, Ref: 56,
T: 1234, T: 1234,
@ -281,8 +281,8 @@ func TestRecord_Type(t *testing.T) {
}, },
}, },
} }
recordType = dec.Type(enc.Histograms(histograms, nil)) recordType = dec.Type(enc.HistogramSamples(histograms, nil))
require.Equal(t, Histograms, recordType) require.Equal(t, HistogramSamples, recordType)
recordType = dec.Type(nil) recordType = dec.Type(nil)
require.Equal(t, Unknown, recordType) require.Equal(t, Unknown, recordType)

View File

@ -25,7 +25,7 @@ import (
// BufferedSeriesIterator wraps an iterator with a look-back buffer. // BufferedSeriesIterator wraps an iterator with a look-back buffer.
// //
// TODO(beorn7): BufferedSeriesIterator does not support Histograms or // TODO(beorn7): BufferedSeriesIterator does not support HistogramSamples or
// FloatHistograms. Either add support or remove BufferedSeriesIterator // FloatHistograms. Either add support or remove BufferedSeriesIterator
// altogether (it seems unused). // altogether (it seems unused).
type BufferedSeriesIterator struct { type BufferedSeriesIterator struct {

View File

@ -38,12 +38,12 @@ import (
// CheckpointStats returns stats about a created checkpoint. // CheckpointStats returns stats about a created checkpoint.
type CheckpointStats struct { type CheckpointStats struct {
DroppedSeries int DroppedSeries int
DroppedSamples int DroppedSamples int // Includes histograms.
DroppedTombstones int DroppedTombstones int
DroppedExemplars int DroppedExemplars int
DroppedMetadata int DroppedMetadata int
TotalSeries int // Processed series including dropped ones. TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples including dropped ones. TotalSamples int // Processed float and histogram samples including dropped ones.
TotalTombstones int // Processed tombstones including dropped ones. TotalTombstones int // Processed tombstones including dropped ones.
TotalExemplars int // Processed exemplars including dropped ones. TotalExemplars int // Processed exemplars including dropped ones.
TotalMetadata int // Processed metadata including dropped ones. TotalMetadata int // Processed metadata including dropped ones.
@ -148,20 +148,21 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea
r := NewReader(sgmReader) r := NewReader(sgmReader)
var ( var (
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
tstones []tombstones.Stone histogramSamples []record.RefHistogramSample
exemplars []record.RefExemplar tstones []tombstones.Stone
metadata []record.RefMetadata exemplars []record.RefExemplar
dec record.Decoder metadata []record.RefMetadata
enc record.Encoder dec record.Decoder
buf []byte enc record.Encoder
recs [][]byte buf []byte
recs [][]byte
latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata) latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata)
) )
for r.Next() { for r.Next() {
series, samples, tstones, exemplars, metadata = series[:0], samples[:0], tstones[:0], exemplars[:0], metadata[:0] series, samples, histogramSamples, tstones, exemplars, metadata = series[:0], samples[:0], histogramSamples[:0], tstones[:0], exemplars[:0], metadata[:0]
// We don't reset the buffer since we batch up multiple records // We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint. // before writing them to the checkpoint.
@ -206,6 +207,24 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea
stats.TotalSamples += len(samples) stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl) stats.DroppedSamples += len(samples) - len(repl)
case record.HistogramSamples:
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
if err != nil {
return nil, errors.Wrap(err, "decode histogram samples")
}
// Drop irrelevant histogramSamples in place.
repl := histogramSamples[:0]
for _, h := range histogramSamples {
if h.T >= mint {
repl = append(repl, h)
}
}
if len(repl) > 0 {
buf = enc.HistogramSamples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case record.Tombstones: case record.Tombstones:
tstones, err = dec.Tombstones(rec, tstones) tstones, err = dec.Tombstones(rec, tstones)
if err != nil { if err != nil {

View File

@ -26,6 +26,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
@ -110,6 +111,21 @@ func TestDeleteCheckpoints(t *testing.T) {
} }
func TestCheckpoint(t *testing.T) { func TestCheckpoint(t *testing.T) {
makeHistogram := func(i int) *histogram.Histogram {
return &histogram.Histogram{
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
}
}
for _, compress := range []bool{false, true} { for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -138,6 +154,7 @@ func TestCheckpoint(t *testing.T) {
w, err = NewSize(nil, nil, dir, 64*1024, compress) w, err = NewSize(nil, nil, dir, 64*1024, compress)
require.NoError(t, err) require.NoError(t, err)
samplesInWAL, histogramsInWAL := 0, 0
var last int64 var last int64
for i := 0; ; i++ { for i := 0; ; i++ {
_, n, err := Segments(w.Dir()) _, n, err := Segments(w.Dir())
@ -173,6 +190,16 @@ func TestCheckpoint(t *testing.T) {
{Ref: 3, T: last + 30000, V: float64(i)}, {Ref: 3, T: last + 30000, V: float64(i)},
}, nil) }, nil)
require.NoError(t, w.Log(b)) require.NoError(t, w.Log(b))
samplesInWAL += 4
h := makeHistogram(i)
b = enc.HistogramSamples([]record.RefHistogramSample{
{Ref: 0, T: last, H: h},
{Ref: 1, T: last + 10000, H: h},
{Ref: 2, T: last + 20000, H: h},
{Ref: 3, T: last + 30000, H: h},
}, nil)
require.NoError(t, w.Log(b))
histogramsInWAL += 4
b = enc.Exemplars([]record.RefExemplar{ b = enc.Exemplars([]record.RefExemplar{
{Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i))}, {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i))},
@ -215,6 +242,7 @@ func TestCheckpoint(t *testing.T) {
var metadata []record.RefMetadata var metadata []record.RefMetadata
r := NewReader(sr) r := NewReader(sr)
samplesInCheckpoint, histogramsInCheckpoint := 0, 0
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
@ -228,6 +256,14 @@ func TestCheckpoint(t *testing.T) {
for _, s := range samples { for _, s := range samples {
require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp")
} }
samplesInCheckpoint += len(samples)
case record.HistogramSamples:
histograms, err := dec.HistogramSamples(rec, nil)
require.NoError(t, err)
for _, h := range histograms {
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
}
histogramsInCheckpoint += len(histograms)
case record.Exemplars: case record.Exemplars:
exemplars, err := dec.Exemplars(rec, nil) exemplars, err := dec.Exemplars(rec, nil)
require.NoError(t, err) require.NoError(t, err)
@ -240,6 +276,11 @@ func TestCheckpoint(t *testing.T) {
} }
} }
require.NoError(t, r.Err()) require.NoError(t, r.Err())
// Making sure we replayed some samples. We expect >50% samples to be still present.
require.Greater(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.5)
require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8)
require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5)
require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8)
expectedRefSeries := []record.RefSeries{ expectedRefSeries := []record.RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},

View File

@ -49,7 +49,7 @@ type WriteTo interface {
// Once returned, the WAL Watcher will not attempt to pass that data again. // Once returned, the WAL Watcher will not attempt to pass that data again.
Append([]record.RefSample) bool Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogram) bool AppendHistograms([]record.RefHistogramSample) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
// Next two methods are intended for garbage-collection: first we call // Next two methods are intended for garbage-collection: first we call
@ -481,8 +481,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
samples []record.RefSample samples []record.RefSample
samplesToSend []record.RefSample samplesToSend []record.RefSample
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogram histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogram histogramsToSend []record.RefHistogramSample
) )
for r.Next() && !isClosed(w.quit) { for r.Next() && !isClosed(w.quit) {
rec := r.Record() rec := r.Record()
@ -540,7 +540,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
} }
w.writer.AppendExemplars(exemplars) w.writer.AppendExemplars(exemplars)
case record.Histograms: case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled. // Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms { if !w.sendHistograms {
break break
@ -548,7 +548,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !tail { if !tail {
break break
} }
histograms, err := dec.Histograms(rec, histograms[:0]) histograms, err := dec.HistogramSamples(rec, histograms[:0])
if err != nil { if err != nil {
w.recordDecodeFailsMetric.Inc() w.recordDecodeFailsMetric.Inc()
return err return err

View File

@ -69,7 +69,7 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
return true return true
} }
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogram) bool { func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
wtm.histogramsAppended += len(h) wtm.histogramsAppended += len(h)
return true return true
} }
@ -171,7 +171,7 @@ func TestTailSamples(t *testing.T) {
for j := 0; j < histogramsCount; j++ { for j := 0; j < histogramsCount; j++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)
histogram := enc.Histograms([]record.RefHistogram{{ histogram := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner), Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1, T: now.UnixNano() + 1,
H: &histogram.Histogram{ H: &histogram.Histogram{