From f540c1dbd3682f22bfe5c8de9cb6c9ae3e3f1ac5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Mon, 29 Aug 2022 17:38:36 +0530 Subject: [PATCH] Add support for histograms in WAL checkpointing (#11210) * Add support for histograms in WAL checkpointing Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar Signed-off-by: Ganesh Vernekar --- storage/remote/queue_manager.go | 2 +- storage/remote/queue_manager_test.go | 10 +++---- storage/remote/write.go | 2 +- tsdb/head_append.go | 28 +++++++++--------- tsdb/head_wal.go | 22 +++++++------- tsdb/record/record.go | 44 ++++++++++++++-------------- tsdb/record/record_test.go | 16 +++++----- tsdb/tsdbutil/buffer.go | 2 +- tsdb/wal/checkpoint.go | 43 +++++++++++++++++++-------- tsdb/wal/checkpoint_test.go | 41 ++++++++++++++++++++++++++ tsdb/wal/watcher.go | 10 +++---- tsdb/wal/watcher_test.go | 4 +-- 12 files changed, 142 insertions(+), 82 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index bd5fe9398..9e0c643da 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -663,7 +663,7 @@ outer: return true } -func (t *QueueManager) AppendHistograms(histograms []record.RefHistogram) bool { +func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) bool { if !t.sendNativeHistograms { return true } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 764e688b3..bd421556c 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -104,7 +104,7 @@ func TestSampleDelivery(t *testing.T) { series []record.RefSeries samples []record.RefSample exemplars []record.RefExemplar - histograms []record.RefHistogram + histograms []record.RefHistogramSample ) // Generates same series in both cases. @@ -583,13 +583,13 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor return exemplars, series } -func createHistograms(numSamples, numSeries int) ([]record.RefHistogram, []record.RefSeries) { - histograms := make([]record.RefHistogram, 0, numSamples) +func createHistograms(numSamples, numSeries int) ([]record.RefHistogramSample, []record.RefSeries) { + histograms := make([]record.RefHistogramSample, 0, numSamples) series := make([]record.RefSeries, 0, numSeries) for i := 0; i < numSeries; i++ { name := fmt.Sprintf("test_metric_%d", i) for j := 0; j < numSamples; j++ { - h := record.RefHistogram{ + h := record.RefHistogramSample{ Ref: chunks.HeadSeriesRef(i), T: int64(j), H: &histogram.Histogram{ @@ -689,7 +689,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco c.wg.Add(len(ss)) } -func (c *TestWriteClient) expectHistograms(hh []record.RefHistogram, series []record.RefSeries) { +func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) { if !c.withWaitGroup { return } diff --git a/storage/remote/write.go b/storage/remote/write.go index 80eef358f..6d8240e91 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -50,7 +50,7 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "histograms_in_total", - Help: "Histograms in to remote storage, compare to histograms out for queue managers.", + Help: "HistogramSamples in to remote storage, compare to histograms out for queue managers.", }) ) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 706f2cd42..f3ec2173c 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -220,15 +220,15 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { h.exemplarsPool.Put(b[:0]) } -func (h *Head) getHistogramBuffer() []record.RefHistogram { +func (h *Head) getHistogramBuffer() []record.RefHistogramSample { b := h.histogramsPool.Get() if b == nil { - return make([]record.RefHistogram, 0, 512) + return make([]record.RefHistogramSample, 0, 512) } - return b.([]record.RefHistogram) + return b.([]record.RefHistogramSample) } -func (h *Head) putHistogramBuffer(b []record.RefHistogram) { +func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) { //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.histogramsPool.Put(b[:0]) } @@ -282,14 +282,14 @@ type headAppender struct { minValidTime int64 // No samples below this timestamp are allowed. mint, maxt int64 - series []record.RefSeries // New series held by this appender. - samples []record.RefSample // New float samples held by this appender. - exemplars []exemplarWithSeriesRef // New exemplars held by this appender. - sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - histograms []record.RefHistogram // New histogram samples held by this appender. - histogramSeries []*memSeries // Histogram series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - metadata []record.RefMetadata // New metadata held by this appender. - metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. + series []record.RefSeries // New series held by this appender. + samples []record.RefSample // New float samples held by this appender. + exemplars []exemplarWithSeriesRef // New exemplars held by this appender. + sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + histograms []record.RefHistogramSample // New histogram samples held by this appender. + histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + metadata []record.RefMetadata // New metadata held by this appender. + metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. appendID, cleanupAppendIDsBelow uint64 closed bool @@ -493,7 +493,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.maxt = t } - a.histograms = append(a.histograms, record.RefHistogram{ + a.histograms = append(a.histograms, record.RefHistogramSample{ Ref: s.ref, T: t, H: h, @@ -659,7 +659,7 @@ func (a *headAppender) log() error { } } if len(a.histograms) > 0 { - rec = enc.Histograms(a.histograms, buf) + rec = enc.HistogramSamples(a.histograms, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return errors.Wrap(err, "log histograms") diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e942d7da2..77e267e4c 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -59,7 +59,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H dec record.Decoder shards = make([][]record.RefSample, n) - histogramShards = make([][]record.RefHistogram, n) + histogramShards = make([][]record.RefHistogramSample, n) decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error @@ -85,7 +85,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H } histogramsPool = sync.Pool{ New: func() interface{} { - return []record.RefHistogram{} + return []record.RefHistogramSample{} }, } metadataPool = sync.Pool{ @@ -197,9 +197,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- exemplars - case record.Histograms: - hists := histogramsPool.Get().([]record.RefHistogram)[:0] - hists, err = dec.Histograms(rec, hists) + case record.HistogramSamples: + hists := histogramsPool.Get().([]record.RefHistogramSample)[:0] + hists, err = dec.HistogramSamples(rec, hists) if err != nil { decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode histograms"), @@ -344,7 +344,7 @@ Outer: } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. exemplarsPool.Put(v) - case []record.RefHistogram: + case []record.RefHistogramSample: samples := v // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise @@ -450,15 +450,15 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedCh type walSubsetProcessor struct { mx sync.Mutex // Take this lock while modifying series in the subset. - input chan interface{} // Either []record.RefSample or []record.RefHistogram. + input chan interface{} // Either []record.RefSample or []record.RefHistogramSample. output chan []record.RefSample - histogramsOutput chan []record.RefHistogram + histogramsOutput chan []record.RefHistogramSample } func (wp *walSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) wp.input = make(chan interface{}, 300) - wp.histogramsOutput = make(chan []record.RefHistogram, 300) + wp.histogramsOutput = make(chan []record.RefHistogramSample, 300) } func (wp *walSubsetProcessor) closeAndDrain() { @@ -480,7 +480,7 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. -func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogram { +func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogramSample { select { case buf := <-wp.histogramsOutput: return buf[:0] @@ -528,7 +528,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs, unknownHi } } wp.output <- samples - case []record.RefHistogram: + case []record.RefHistogramSample: for _, s := range samples { if s.T < minValidTime { continue diff --git a/tsdb/record/record.go b/tsdb/record/record.go index c6d8781e1..646647ea9 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -47,7 +47,7 @@ const ( // Metadata is used to match WAL records of type Metadata. Metadata Type = 6 // Histograms is used to match WAL records of type Histograms. - Histograms Type = 7 + HistogramSamples Type = 7 ) func (rt Type) String() string { @@ -60,8 +60,8 @@ func (rt Type) String() string { return "tombstones" case Exemplars: return "exemplars" - case Histograms: - return "histograms" + case HistogramSamples: + return "histogram_samples" case Metadata: return "metadata" default: @@ -73,14 +73,14 @@ func (rt Type) String() string { type MetricType uint8 const ( - UnknownMT MetricType = 0 - Counter MetricType = 1 - Gauge MetricType = 2 - Histogram MetricType = 3 - GaugeHistogram MetricType = 4 - Summary MetricType = 5 - Info MetricType = 6 - Stateset MetricType = 7 + UnknownMT MetricType = 0 + Counter MetricType = 1 + Gauge MetricType = 2 + HistogramSample MetricType = 3 + GaugeHistogram MetricType = 4 + Summary MetricType = 5 + Info MetricType = 6 + Stateset MetricType = 7 ) func GetMetricType(t textparse.MetricType) uint8 { @@ -90,7 +90,7 @@ func GetMetricType(t textparse.MetricType) uint8 { case textparse.MetricTypeGauge: return uint8(Gauge) case textparse.MetricTypeHistogram: - return uint8(Histogram) + return uint8(HistogramSample) case textparse.MetricTypeGaugeHistogram: return uint8(GaugeHistogram) case textparse.MetricTypeSummary: @@ -110,7 +110,7 @@ func ToTextparseMetricType(m uint8) textparse.MetricType { return textparse.MetricTypeCounter case uint8(Gauge): return textparse.MetricTypeGauge - case uint8(Histogram): + case uint8(HistogramSample): return textparse.MetricTypeHistogram case uint8(GaugeHistogram): return textparse.MetricTypeGaugeHistogram @@ -140,7 +140,7 @@ type RefSeries struct { } // RefSample is a timestamp/value pair associated with a reference to a series. -// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogram. +// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample. type RefSample struct { Ref chunks.HeadSeriesRef T int64 @@ -163,8 +163,8 @@ type RefExemplar struct { Labels labels.Labels } -// RefHistogram is a histogram. -type RefHistogram struct { +// RefHistogramSample is a histogram. +type RefHistogramSample struct { Ref chunks.HeadSeriesRef T int64 H *histogram.Histogram @@ -181,7 +181,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, Histograms, Metadata: + case Series, Samples, Tombstones, Exemplars, HistogramSamples, Metadata: return t } return Unknown @@ -367,10 +367,10 @@ func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemp return exemplars, nil } -func (d *Decoder) Histograms(rec []byte, histograms []RefHistogram) ([]RefHistogram, error) { +func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) ([]RefHistogramSample, error) { dec := encoding.Decbuf{B: rec} t := Type(dec.Byte()) - if t != Histograms { + if t != HistogramSamples { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -384,7 +384,7 @@ func (d *Decoder) Histograms(rec []byte, histograms []RefHistogram) ([]RefHistog dref := dec.Varint64() dtime := dec.Varint64() - rh := RefHistogram{ + rh := RefHistogramSample{ Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)), T: baseTime + dtime, H: &histogram.Histogram{ @@ -563,9 +563,9 @@ func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encodi } } -func (e *Encoder) Histograms(histograms []RefHistogram, b []byte) []byte { +func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(Histograms)) + buf.PutByte(byte(HistogramSamples)) if len(histograms) == 0 { return buf.Get() diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 2f48a0f2f..fe5b0bac5 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -110,7 +110,7 @@ func TestRecord_EncodeDecode(t *testing.T) { require.NoError(t, err) require.Equal(t, exemplars, decExemplars) - histograms := []RefHistogram{ + histograms := []RefHistogramSample{ { Ref: 56, T: 1234, @@ -150,7 +150,7 @@ func TestRecord_EncodeDecode(t *testing.T) { }, } - decHistograms, err := dec.Histograms(enc.Histograms(histograms, nil), nil) + decHistograms, err := dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) require.NoError(t, err) require.Equal(t, histograms, decHistograms) } @@ -218,7 +218,7 @@ func TestRecord_Corrupted(t *testing.T) { }) t.Run("Test corrupted histogram record", func(t *testing.T) { - histograms := []RefHistogram{ + histograms := []RefHistogramSample{ { Ref: 56, T: 1234, @@ -237,8 +237,8 @@ func TestRecord_Corrupted(t *testing.T) { }, } - corrupted := enc.Histograms(histograms, nil)[:8] - _, err := dec.Histograms(corrupted, nil) + corrupted := enc.HistogramSamples(histograms, nil)[:8] + _, err := dec.HistogramSamples(corrupted, nil) require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize) }) } @@ -263,7 +263,7 @@ func TestRecord_Type(t *testing.T) { recordType = dec.Type(enc.Metadata(metadata, nil)) require.Equal(t, Metadata, recordType) - histograms := []RefHistogram{ + histograms := []RefHistogramSample{ { Ref: 56, T: 1234, @@ -281,8 +281,8 @@ func TestRecord_Type(t *testing.T) { }, }, } - recordType = dec.Type(enc.Histograms(histograms, nil)) - require.Equal(t, Histograms, recordType) + recordType = dec.Type(enc.HistogramSamples(histograms, nil)) + require.Equal(t, HistogramSamples, recordType) recordType = dec.Type(nil) require.Equal(t, Unknown, recordType) diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go index 5139ca033..d7293c82a 100644 --- a/tsdb/tsdbutil/buffer.go +++ b/tsdb/tsdbutil/buffer.go @@ -25,7 +25,7 @@ import ( // BufferedSeriesIterator wraps an iterator with a look-back buffer. // -// TODO(beorn7): BufferedSeriesIterator does not support Histograms or +// TODO(beorn7): BufferedSeriesIterator does not support HistogramSamples or // FloatHistograms. Either add support or remove BufferedSeriesIterator // altogether (it seems unused). type BufferedSeriesIterator struct { diff --git a/tsdb/wal/checkpoint.go b/tsdb/wal/checkpoint.go index f8f9f387d..d892d720a 100644 --- a/tsdb/wal/checkpoint.go +++ b/tsdb/wal/checkpoint.go @@ -38,12 +38,12 @@ import ( // CheckpointStats returns stats about a created checkpoint. type CheckpointStats struct { DroppedSeries int - DroppedSamples int + DroppedSamples int // Includes histograms. DroppedTombstones int DroppedExemplars int DroppedMetadata int TotalSeries int // Processed series including dropped ones. - TotalSamples int // Processed samples including dropped ones. + TotalSamples int // Processed float and histogram samples including dropped ones. TotalTombstones int // Processed tombstones including dropped ones. TotalExemplars int // Processed exemplars including dropped ones. TotalMetadata int // Processed metadata including dropped ones. @@ -148,20 +148,21 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea r := NewReader(sgmReader) var ( - series []record.RefSeries - samples []record.RefSample - tstones []tombstones.Stone - exemplars []record.RefExemplar - metadata []record.RefMetadata - dec record.Decoder - enc record.Encoder - buf []byte - recs [][]byte + series []record.RefSeries + samples []record.RefSample + histogramSamples []record.RefHistogramSample + tstones []tombstones.Stone + exemplars []record.RefExemplar + metadata []record.RefMetadata + dec record.Decoder + enc record.Encoder + buf []byte + recs [][]byte latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata) ) for r.Next() { - series, samples, tstones, exemplars, metadata = series[:0], samples[:0], tstones[:0], exemplars[:0], metadata[:0] + series, samples, histogramSamples, tstones, exemplars, metadata = series[:0], samples[:0], histogramSamples[:0], tstones[:0], exemplars[:0], metadata[:0] // We don't reset the buffer since we batch up multiple records // before writing them to the checkpoint. @@ -206,6 +207,24 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea stats.TotalSamples += len(samples) stats.DroppedSamples += len(samples) - len(repl) + case record.HistogramSamples: + histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) + if err != nil { + return nil, errors.Wrap(err, "decode histogram samples") + } + // Drop irrelevant histogramSamples in place. + repl := histogramSamples[:0] + for _, h := range histogramSamples { + if h.T >= mint { + repl = append(repl, h) + } + } + if len(repl) > 0 { + buf = enc.HistogramSamples(repl, buf) + } + stats.TotalSamples += len(samples) + stats.DroppedSamples += len(samples) - len(repl) + case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index 8222f8ad3..920c45af3 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -110,6 +111,21 @@ func TestDeleteCheckpoints(t *testing.T) { } func TestCheckpoint(t *testing.T) { + makeHistogram := func(i int) *histogram.Histogram { + return &histogram.Histogram{ + Count: 5 + uint64(i*4), + ZeroCount: 2 + uint64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + } + } + for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { dir := t.TempDir() @@ -138,6 +154,7 @@ func TestCheckpoint(t *testing.T) { w, err = NewSize(nil, nil, dir, 64*1024, compress) require.NoError(t, err) + samplesInWAL, histogramsInWAL := 0, 0 var last int64 for i := 0; ; i++ { _, n, err := Segments(w.Dir()) @@ -173,6 +190,16 @@ func TestCheckpoint(t *testing.T) { {Ref: 3, T: last + 30000, V: float64(i)}, }, nil) require.NoError(t, w.Log(b)) + samplesInWAL += 4 + h := makeHistogram(i) + b = enc.HistogramSamples([]record.RefHistogramSample{ + {Ref: 0, T: last, H: h}, + {Ref: 1, T: last + 10000, H: h}, + {Ref: 2, T: last + 20000, H: h}, + {Ref: 3, T: last + 30000, H: h}, + }, nil) + require.NoError(t, w.Log(b)) + histogramsInWAL += 4 b = enc.Exemplars([]record.RefExemplar{ {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i))}, @@ -215,6 +242,7 @@ func TestCheckpoint(t *testing.T) { var metadata []record.RefMetadata r := NewReader(sr) + samplesInCheckpoint, histogramsInCheckpoint := 0, 0 for r.Next() { rec := r.Record() @@ -228,6 +256,14 @@ func TestCheckpoint(t *testing.T) { for _, s := range samples { require.GreaterOrEqual(t, s.T, last/2, "sample with wrong timestamp") } + samplesInCheckpoint += len(samples) + case record.HistogramSamples: + histograms, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + for _, h := range histograms { + require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") + } + histogramsInCheckpoint += len(histograms) case record.Exemplars: exemplars, err := dec.Exemplars(rec, nil) require.NoError(t, err) @@ -240,6 +276,11 @@ func TestCheckpoint(t *testing.T) { } } require.NoError(t, r.Err()) + // Making sure we replayed some samples. We expect >50% samples to be still present. + require.Greater(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.5) + require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8) + require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5) + require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8) expectedRefSeries := []record.RefSeries{ {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 9b16d7516..dddf2eec4 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -49,7 +49,7 @@ type WriteTo interface { // Once returned, the WAL Watcher will not attempt to pass that data again. Append([]record.RefSample) bool AppendExemplars([]record.RefExemplar) bool - AppendHistograms([]record.RefHistogram) bool + AppendHistograms([]record.RefHistogramSample) bool StoreSeries([]record.RefSeries, int) // Next two methods are intended for garbage-collection: first we call @@ -481,8 +481,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { samples []record.RefSample samplesToSend []record.RefSample exemplars []record.RefExemplar - histograms []record.RefHistogram - histogramsToSend []record.RefHistogram + histograms []record.RefHistogramSample + histogramsToSend []record.RefHistogramSample ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -540,7 +540,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } w.writer.AppendExemplars(exemplars) - case record.Histograms: + case record.HistogramSamples: // Skip if experimental "histograms over remote write" is not enabled. if !w.sendHistograms { break @@ -548,7 +548,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !tail { break } - histograms, err := dec.Histograms(rec, histograms[:0]) + histograms, err := dec.HistogramSamples(rec, histograms[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 829ae1741..92a028b2e 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -69,7 +69,7 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { return true } -func (wtm *writeToMock) AppendHistograms(h []record.RefHistogram) bool { +func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { wtm.histogramsAppended += len(h) return true } @@ -171,7 +171,7 @@ func TestTailSamples(t *testing.T) { for j := 0; j < histogramsCount; j++ { inner := rand.Intn(ref + 1) - histogram := enc.Histograms([]record.RefHistogram{{ + histogram := enc.HistogramSamples([]record.RefHistogramSample{{ Ref: chunks.HeadSeriesRef(inner), T: now.UnixNano() + 1, H: &histogram.Histogram{