Support appending different sample types to the same series (#9705)

* Support appending different sample types to the same series

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix build

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/9877/head
Ganesh Vernekar 3 years ago committed by GitHub
parent ae1ca39324
commit 26c0a433f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -172,6 +172,9 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier,
// It must be completed with a call to Commit or Rollback and must not be reused afterwards. // It must be completed with a call to Commit or Rollback and must not be reused afterwards.
// //
// Operations on the Appender interface are not goroutine-safe. // Operations on the Appender interface are not goroutine-safe.
//
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
type Appender interface { type Appender interface {
// Append adds a sample pair for the given series. // Append adds a sample pair for the given series.
// An optional series reference can be provided to accelerate calls. // An optional series reference can be provided to accelerate calls.

@ -614,7 +614,7 @@ func (h *Head) Init(minValidTime int64) error {
sparseHistogramSeries := 0 sparseHistogramSeries := 0
for _, m := range h.series.series { for _, m := range h.series.series {
for _, ms := range m { for _, ms := range m {
if ms.histogramSeries { if ms.isHistogramSeries {
sparseHistogramSeries++ sparseHistogramSeries++
} }
} }
@ -1404,7 +1404,7 @@ func (s *stripeSeries) gc(mint int64) (map[storage.SeriesRef]struct{}, int, int6
s.locks[j].Lock() s.locks[j].Lock()
} }
if series.histogramSeries { if series.isHistogramSeries {
sparseHistogramSeriesDeleted++ sparseHistogramSeriesDeleted++
} }
deleted[storage.SeriesRef(series.ref)] = struct{}{} deleted[storage.SeriesRef(series.ref)] = struct{}{}
@ -1554,8 +1554,7 @@ type memSeries struct {
txs *txRing txs *txRing
// Temporary variable for sparsehistogram experiment. isHistogramSeries bool
histogramSeries bool
} }
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool) *memSeries { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool) *memSeries {

@ -279,7 +279,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
} }
if value.IsStaleNaN(v) && s.histogramSeries { if value.IsStaleNaN(v) && s.isHistogramSeries {
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}) return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v})
} }
@ -414,7 +414,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
if err != nil { if err != nil {
return 0, err return 0, err
} }
s.histogramSeries = true s.isHistogramSeries = true
if created { if created {
a.head.metrics.histogramSeries.Inc() a.head.metrics.histogramSeries.Inc()
a.series = append(a.series, record.RefSeries{ a.series = append(a.series, record.RefSeries{
@ -609,6 +609,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
} }
s.app.Append(t, v) s.app.Append(t, v)
s.isHistogramSeries = false
c.maxTime = t c.maxTime = t
@ -678,7 +679,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
} }
s.app.AppendHistogram(t, h) s.app.AppendHistogram(t, h)
s.histogramSeries = true s.isHistogramSeries = true
c.maxTime = t c.maxTime = t
@ -720,6 +721,13 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMa
return c, false, chunkCreated return c, false, chunkCreated
} }
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
chunkCreated = true
}
numSamples := c.chunk.NumSamples() numSamples := c.chunk.NumSamples()
if numSamples == 0 { if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot, // It could be the new chunk created after reading the chunk snapshot,

@ -429,7 +429,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
msIter.stopAfter = stopAfter msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf msIter.buf = s.sampleBuf
msIter.histogramBuf = s.histogramBuf msIter.histogramBuf = s.histogramBuf
msIter.histogramSeries = s.histogramSeries msIter.isHistogramSeries = s.isHistogramSeries
return msIter return msIter
} }
return &memSafeIterator{ return &memSafeIterator{
@ -438,10 +438,10 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
i: -1, i: -1,
stopAfter: stopAfter, stopAfter: stopAfter,
}, },
total: numSamples, total: numSamples,
buf: s.sampleBuf, buf: s.sampleBuf,
histogramBuf: s.histogramBuf, histogramBuf: s.histogramBuf,
histogramSeries: s.histogramSeries, isHistogramSeries: s.isHistogramSeries,
} }
} }
@ -450,10 +450,10 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
type memSafeIterator struct { type memSafeIterator struct {
stopIterator stopIterator
histogramSeries bool isHistogramSeries bool
total int total int
buf [4]sample buf [4]sample
histogramBuf [4]histogramSample histogramBuf [4]histogramSample
} }
func (it *memSafeIterator) Seek(t int64) bool { func (it *memSafeIterator) Seek(t int64) bool {
@ -462,13 +462,13 @@ func (it *memSafeIterator) Seek(t int64) bool {
} }
var ts int64 var ts int64
if it.histogramSeries { if it.isHistogramSeries {
ts, _ = it.AtHistogram() ts, _ = it.AtHistogram()
} else { } else {
ts, _ = it.At() ts, _ = it.At()
} }
if it.histogramSeries { if it.isHistogramSeries {
for t > ts || it.i == -1 { for t > ts || it.i == -1 {
if !it.Next() { if !it.Next() {
return false return false

@ -3159,3 +3159,131 @@ func TestHistogramCounterResetHeader(t *testing.T) {
appendHistogram(h) appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset) checkExpCounterResetHeader(chunkenc.CounterReset)
} }
func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir()
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
db.DisableCompactions()
hists := GenerateTestHistograms(10)
lbls := labels.Labels{{Name: "a", Value: "b"}}
type result struct {
t int64
v float64
h *histogram.Histogram
enc chunkenc.Encoding
}
expResult := []result{}
ref := storage.SeriesRef(0)
addFloat64Sample := func(app storage.Appender, ts int64, v float64) {
ref, err = app.Append(ref, lbls, ts, v)
require.NoError(t, err)
expResult = append(expResult, result{
t: ts,
v: v,
enc: chunkenc.EncXOR,
})
}
addHistogramSample := func(app storage.Appender, ts int64, h *histogram.Histogram) {
ref, err = app.AppendHistogram(ref, lbls, ts, h)
require.NoError(t, err)
expResult = append(expResult, result{
t: ts,
h: h,
enc: chunkenc.EncHistogram,
})
}
checkExpChunks := func(count int) {
ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.NotNil(t, ms)
require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk.
}
// Only histograms in first commit.
app := db.Appender(context.Background())
addHistogramSample(app, 1, hists[1])
require.NoError(t, app.Commit())
checkExpChunks(1)
// Only float64 in second commit, a new chunk should be cut.
app = db.Appender(context.Background())
addFloat64Sample(app, 2, 2)
require.NoError(t, app.Commit())
checkExpChunks(2)
// Out of order histogram is shown correctly for a float64 chunk. No new chunk.
app = db.Appender(context.Background())
_, err = app.AppendHistogram(ref, lbls, 1, hists[2])
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.NoError(t, app.Commit())
// Only histograms in third commit to check float64 -> histogram transition.
app = db.Appender(context.Background())
addHistogramSample(app, 3, hists[3])
require.NoError(t, app.Commit())
checkExpChunks(3)
// Out of order float64 is shown correctly for a histogram chunk. No new chunk.
app = db.Appender(context.Background())
_, err = app.Append(ref, lbls, 1, 2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.NoError(t, app.Commit())
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
app = db.Appender(context.Background())
addFloat64Sample(app, 4, 4)
// This won't be committed.
addHistogramSample(app, 5, hists[5])
expResult = expResult[0 : len(expResult)-1]
addFloat64Sample(app, 6, 6)
require.NoError(t, app.Commit())
checkExpChunks(4) // Only 1 new chunk for float64.
// Here the histogram is appended at the end, hence the first histogram is out of order.
app = db.Appender(context.Background())
// Out of order w.r.t. the next float64 sample that is appended first.
addHistogramSample(app, 7, hists[7])
expResult = expResult[0 : len(expResult)-1]
addFloat64Sample(app, 8, 9)
addHistogramSample(app, 9, hists[9])
require.NoError(t, app.Commit())
checkExpChunks(5) // float64 added to old chunk, only 1 new for histograms.
// Query back and expect same order of samples.
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, q.Close())
})
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
it := s.Iterator()
expIdx := 0
for it.Next() {
require.Equal(t, expResult[expIdx].enc, it.ChunkEncoding())
if it.ChunkEncoding() == chunkenc.EncHistogram {
ts, h := it.AtHistogram()
require.Equal(t, expResult[expIdx].t, ts)
require.Equal(t, expResult[expIdx].h, h)
} else {
ts, v := it.At()
require.Equal(t, expResult[expIdx].t, ts)
require.Equal(t, expResult[expIdx].v, v)
}
expIdx++
}
require.NoError(t, it.Err())
require.NoError(t, ss.Err())
require.Equal(t, len(expResult), expIdx)
require.False(t, ss.Next()) // Only 1 series.
}

@ -160,7 +160,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
if ms.head() == nil { if ms.head() == nil {
// First histogram for the series. Count this in metrics. // First histogram for the series. Count this in metrics.
ms.histogramSeries = true ms.isHistogramSeries = true
} }
if rh.T < h.minValidTime.Load() { if rh.T < h.minValidTime.Load() {

Loading…
Cancel
Save