From 26c0a433f53777dcc8faa62b66f5ec4a1bee477a Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Fri, 26 Nov 2021 17:43:27 +0530 Subject: [PATCH] Support appending different sample types to the same series (#9705) * Support appending different sample types to the same series Signed-off-by: Ganesh Vernekar * Fix comments Signed-off-by: Ganesh Vernekar * Fix build Signed-off-by: Ganesh Vernekar --- storage/interface.go | 3 + tsdb/head.go | 7 +-- tsdb/head_append.go | 14 ++++- tsdb/head_read.go | 22 ++++---- tsdb/head_test.go | 128 +++++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 2 +- 6 files changed, 157 insertions(+), 19 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index bb3db1085..ec47edbf5 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -172,6 +172,9 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, // It must be completed with a call to Commit or Rollback and must not be reused afterwards. // // Operations on the Appender interface are not goroutine-safe. +// +// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender. +// The behaviour is undefined if samples of different types are appended to the same series in a single Commit(). type Appender interface { // Append adds a sample pair for the given series. // An optional series reference can be provided to accelerate calls. diff --git a/tsdb/head.go b/tsdb/head.go index 033a7ce26..0ad171807 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -614,7 +614,7 @@ func (h *Head) Init(minValidTime int64) error { sparseHistogramSeries := 0 for _, m := range h.series.series { for _, ms := range m { - if ms.histogramSeries { + if ms.isHistogramSeries { sparseHistogramSeries++ } } @@ -1404,7 +1404,7 @@ func (s *stripeSeries) gc(mint int64) (map[storage.SeriesRef]struct{}, int, int6 s.locks[j].Lock() } - if series.histogramSeries { + if series.isHistogramSeries { sparseHistogramSeriesDeleted++ } deleted[storage.SeriesRef(series.ref)] = struct{}{} @@ -1554,8 +1554,7 @@ type memSeries struct { txs *txRing - // Temporary variable for sparsehistogram experiment. - histogramSeries bool + isHistogramSeries bool } func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool) *memSeries { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index d8834c09f..91f0dd358 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -279,7 +279,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } } - if value.IsStaleNaN(v) && s.histogramSeries { + if value.IsStaleNaN(v) && s.isHistogramSeries { return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}) } @@ -414,7 +414,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels if err != nil { return 0, err } - s.histogramSeries = true + s.isHistogramSeries = true if created { a.head.metrics.histogramSeries.Inc() a.series = append(a.series, record.RefSeries{ @@ -609,6 +609,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper } s.app.Append(t, v) + s.isHistogramSeries = false c.maxTime = t @@ -678,7 +679,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui } s.app.AppendHistogram(t, h) - s.histogramSeries = true + s.isHistogramSeries = true c.maxTime = t @@ -720,6 +721,13 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMa return c, false, chunkCreated } + if c.chunk.Encoding() != e { + // The chunk encoding expected by this append is different than the head chunk's + // encoding. So we cut a new chunk with the expected encoding. + c = s.cutNewHeadChunk(t, e, chunkDiskMapper) + chunkCreated = true + } + numSamples := c.chunk.NumSamples() if numSamples == 0 { // It could be the new chunk created after reading the chunk snapshot, diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 340a49ddf..7ec49d3db 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -429,7 +429,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch msIter.stopAfter = stopAfter msIter.buf = s.sampleBuf msIter.histogramBuf = s.histogramBuf - msIter.histogramSeries = s.histogramSeries + msIter.isHistogramSeries = s.isHistogramSeries return msIter } return &memSafeIterator{ @@ -438,10 +438,10 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch i: -1, stopAfter: stopAfter, }, - total: numSamples, - buf: s.sampleBuf, - histogramBuf: s.histogramBuf, - histogramSeries: s.histogramSeries, + total: numSamples, + buf: s.sampleBuf, + histogramBuf: s.histogramBuf, + isHistogramSeries: s.isHistogramSeries, } } @@ -450,10 +450,10 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch type memSafeIterator struct { stopIterator - histogramSeries bool - total int - buf [4]sample - histogramBuf [4]histogramSample + isHistogramSeries bool + total int + buf [4]sample + histogramBuf [4]histogramSample } func (it *memSafeIterator) Seek(t int64) bool { @@ -462,13 +462,13 @@ func (it *memSafeIterator) Seek(t int64) bool { } var ts int64 - if it.histogramSeries { + if it.isHistogramSeries { ts, _ = it.AtHistogram() } else { ts, _ = it.At() } - if it.histogramSeries { + if it.isHistogramSeries { for t > ts || it.i == -1 { if !it.Next() { return false diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2ef846556..ac1a8725e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3159,3 +3159,131 @@ func TestHistogramCounterResetHeader(t *testing.T) { appendHistogram(h) checkExpCounterResetHeader(chunkenc.CounterReset) } + +func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { + dir := t.TempDir() + db, err := Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + db.DisableCompactions() + + hists := GenerateTestHistograms(10) + lbls := labels.Labels{{Name: "a", Value: "b"}} + + type result struct { + t int64 + v float64 + h *histogram.Histogram + enc chunkenc.Encoding + } + expResult := []result{} + ref := storage.SeriesRef(0) + addFloat64Sample := func(app storage.Appender, ts int64, v float64) { + ref, err = app.Append(ref, lbls, ts, v) + require.NoError(t, err) + expResult = append(expResult, result{ + t: ts, + v: v, + enc: chunkenc.EncXOR, + }) + } + addHistogramSample := func(app storage.Appender, ts int64, h *histogram.Histogram) { + ref, err = app.AppendHistogram(ref, lbls, ts, h) + require.NoError(t, err) + expResult = append(expResult, result{ + t: ts, + h: h, + enc: chunkenc.EncHistogram, + }) + } + checkExpChunks := func(count int) { + ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.NotNil(t, ms) + require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk. + } + + // Only histograms in first commit. + app := db.Appender(context.Background()) + addHistogramSample(app, 1, hists[1]) + require.NoError(t, app.Commit()) + checkExpChunks(1) + + // Only float64 in second commit, a new chunk should be cut. + app = db.Appender(context.Background()) + addFloat64Sample(app, 2, 2) + require.NoError(t, app.Commit()) + checkExpChunks(2) + + // Out of order histogram is shown correctly for a float64 chunk. No new chunk. + app = db.Appender(context.Background()) + _, err = app.AppendHistogram(ref, lbls, 1, hists[2]) + require.Equal(t, storage.ErrOutOfOrderSample, err) + require.NoError(t, app.Commit()) + + // Only histograms in third commit to check float64 -> histogram transition. + app = db.Appender(context.Background()) + addHistogramSample(app, 3, hists[3]) + require.NoError(t, app.Commit()) + checkExpChunks(3) + + // Out of order float64 is shown correctly for a histogram chunk. No new chunk. + app = db.Appender(context.Background()) + _, err = app.Append(ref, lbls, 1, 2) + require.Equal(t, storage.ErrOutOfOrderSample, err) + require.NoError(t, app.Commit()) + + // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also + // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. + app = db.Appender(context.Background()) + addFloat64Sample(app, 4, 4) + // This won't be committed. + addHistogramSample(app, 5, hists[5]) + expResult = expResult[0 : len(expResult)-1] + addFloat64Sample(app, 6, 6) + require.NoError(t, app.Commit()) + checkExpChunks(4) // Only 1 new chunk for float64. + + // Here the histogram is appended at the end, hence the first histogram is out of order. + app = db.Appender(context.Background()) + // Out of order w.r.t. the next float64 sample that is appended first. + addHistogramSample(app, 7, hists[7]) + expResult = expResult[0 : len(expResult)-1] + addFloat64Sample(app, 8, 9) + addHistogramSample(app, 9, hists[9]) + require.NoError(t, app.Commit()) + checkExpChunks(5) // float64 added to old chunk, only 1 new for histograms. + + // Query back and expect same order of samples. + q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, q.Close()) + }) + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.True(t, ss.Next()) + s := ss.At() + it := s.Iterator() + expIdx := 0 + for it.Next() { + require.Equal(t, expResult[expIdx].enc, it.ChunkEncoding()) + if it.ChunkEncoding() == chunkenc.EncHistogram { + ts, h := it.AtHistogram() + require.Equal(t, expResult[expIdx].t, ts) + require.Equal(t, expResult[expIdx].h, h) + } else { + ts, v := it.At() + require.Equal(t, expResult[expIdx].t, ts) + require.Equal(t, expResult[expIdx].v, v) + } + expIdx++ + } + require.NoError(t, it.Err()) + require.NoError(t, ss.Err()) + require.Equal(t, len(expResult), expIdx) + require.False(t, ss.Next()) // Only 1 series. +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index c2b0308a5..f9bac1108 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -160,7 +160,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H if ms.head() == nil { // First histogram for the series. Count this in metrics. - ms.histogramSeries = true + ms.isHistogramSeries = true } if rh.T < h.minValidTime.Load() {