From d166da7b59f7c89a5c7c197c1e8c5add2e70a938 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Sep 2022 15:02:05 +0100 Subject: [PATCH] tsdb: stop saving a copy of last 4 samples in memSeries (#11296) * TSDB chunks: remove race between writing and reading Because the data is stored as a bit-stream, the last byte in the stream could change if the stream is appended to after an Iterator is obtained. Copy the last byte when the Iterator is created, so we don't have to read it later. Clarify in comments that concurrent Iterator and Appender are allowed, but the chunk must not be modified while an Iterator is created. (This was already the case, in order to copy the bstream slice header.) * TSDB: stop saving last 4 samples in memSeries This extra copy of the last 4 samples was introduced to avoid a race condition between reading the last byte of the chunk and writing to it. But now we have fixed that by having `bstreamReader` copy the last byte, we don't need to copy the last 4 samples. This change saves 56 bytes per series, which is very worthwhile when you have millions or tens of millions of series. * TSDB: tidy up stopIterator re-use Previous changes have left this code duplicating some lines; pull them out to a separate function and tidy up. * TSDB head_test: stop checking when iterators are wrapped The behaviour has changed so chunk iterators are only wrapped when transaction isolation requires them to stop short of the end. This makes tests fail which are checking the type. Tests should check the observable behaviour, not the type. Signed-off-by: Bryan Boreham Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- tsdb/chunkenc/bstream.go | 24 ++++++++--- tsdb/chunkenc/xor.go | 8 ++-- tsdb/head.go | 5 +-- tsdb/head_append.go | 8 +--- tsdb/head_read.go | 90 +++++++--------------------------------- tsdb/head_test.go | 16 +------ tsdb/head_wal.go | 26 ++++++------ 7 files changed, 58 insertions(+), 119 deletions(-) diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 833c9794b..60531023b 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -119,11 +119,18 @@ type bstreamReader struct { buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits. valid uint8 // The number of right-most bits valid to read (from left) in the current 8 byte buffer. + last byte // A copy of the last byte of the stream. } func newBReader(b []byte) bstreamReader { + // The last byte of the stream can be updated later, so we take a copy. + var last byte + if len(b) > 0 { + last = b[len(b)-1] + } return bstreamReader{ stream: b, + last: last, } } @@ -223,17 +230,24 @@ func (b *bstreamReader) loadNextBuffer(nbits uint8) bool { return true } - // We're here if the are 8 or less bytes left in the stream. Since this reader needs - // to handle race conditions with concurrent writes happening on the very last byte - // we make sure to never over more than the minimum requested bits (rounded up to - // the next byte). The following code is slower but called less frequently. + // We're here if there are 8 or less bytes left in the stream. + // The following code is slower but called less frequently. nbytes := int((nbits / 8) + 1) if b.streamOffset+nbytes > len(b.stream) { nbytes = len(b.stream) - b.streamOffset } buffer := uint64(0) - for i := 0; i < nbytes; i++ { + skip := 0 + if b.streamOffset+nbytes == len(b.stream) { + // There can be concurrent writes happening on the very last byte + // of the stream, so use the copy we took at initialization time. + buffer = buffer | uint64(b.last) + // Read up to the byte before + skip = 1 + } + + for i := 0; i < nbytes-skip; i++ { buffer = buffer | (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1))) } diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 716f0698f..3429c2c60 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -88,6 +88,8 @@ func (c *XORChunk) Compact() { } // Appender implements the Chunk interface. +// It is not valid to call Appender() multiple times concurrently or to use multiple +// Appenders on the same chunk. func (c *XORChunk) Appender() (Appender, error) { it := c.iterator(nil) @@ -115,9 +117,6 @@ func (c *XORChunk) Appender() (Appender, error) { } func (c *XORChunk) iterator(it Iterator) *xorIterator { - // Should iterators guarantee to act on a copy of the data so it doesn't lock append? - // When using striped locks to guard access to chunks, probably yes. - // Could only copy data if the chunk is not completed yet. if xorIter, ok := it.(*xorIterator); ok { xorIter.Reset(c.b.bytes()) return xorIter @@ -132,6 +131,9 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator { } // Iterator implements the Chunk interface. +// Iterator() must not be called concurrently with any modifications to the chunk, +// but after it returns you can use an Iterator concurrently with an Appender or +// other Iterators. func (c *XORChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } diff --git a/tsdb/head.go b/tsdb/head.go index 90cfacf79..8dd151163 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1803,9 +1803,8 @@ type memSeries struct { nextAt int64 // Timestamp at which to cut the next chunk. - // We keep the last 4 samples here (in addition to appending them to the chunk) so we don't need coordination between appender and querier. - // Even the most compact encoding of a sample takes 2 bits, so the last byte is not contended. - sampleBuf [4]sample + // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. + lastValue float64 // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 067281cc4..f843aa1ec 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -362,7 +362,7 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { + if math.Float64bits(s.lastValue) != math.Float64bits(v) { return false, 0, storage.ErrDuplicateSampleForTimestamp } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. @@ -800,11 +800,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper s.app.Append(t, v) c.maxTime = t - - s.sampleBuf[0] = s.sampleBuf[1] - s.sampleBuf[1] = s.sampleBuf[2] - s.sampleBuf[2] = s.sampleBuf[3] - s.sampleBuf[3] = sample{t: t, v: v} + s.lastValue = v if appendID > 0 && s.txs != nil { s.txs.add(appendID) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0fe24792a..a97537a1f 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -677,87 +677,25 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch if stopAfter == 0 { return chunkenc.NewNopIterator() } - - if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) { - if stopAfter == numSamples { - return c.chunk.Iterator(it) - } - if msIter, ok := it.(*stopIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.stopAfter = stopAfter - return msIter - } - return &stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - } - } - // Serve the last 4 samples for the last chunk from the sample buffer - // as their compressed bytes may be mutated by added samples. - if msIter, ok := it.(*memSafeIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.total = numSamples - msIter.stopAfter = stopAfter - msIter.buf = s.sampleBuf - return msIter - } - return &memSafeIterator{ - stopIterator: stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - }, - total: numSamples, - buf: s.sampleBuf, + if stopAfter == numSamples { + return c.chunk.Iterator(it) } + return makeStopIterator(c.chunk, it, stopAfter) } -// memSafeIterator returns values from the wrapped stopIterator -// except the last 4, which come from buf. -type memSafeIterator struct { - stopIterator - - total int - buf [4]sample -} - -func (it *memSafeIterator) Seek(t int64) bool { - if it.Err() != nil { - return false +func makeStopIterator(c chunkenc.Chunk, it chunkenc.Iterator, stopAfter int) chunkenc.Iterator { + // Re-use the Iterator object if it is a stopIterator. + if stopIter, ok := it.(*stopIterator); ok { + stopIter.Iterator = c.Iterator(stopIter.Iterator) + stopIter.i = -1 + stopIter.stopAfter = stopAfter + return stopIter } - - ts, _ := it.At() - - for t > ts || it.i == -1 { - if !it.Next() { - return false - } - ts, _ = it.At() - } - - return true -} - -func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.stopAfter { - return false - } - it.i++ - if it.total-it.i > 4 { - return it.Iterator.Next() - } - return true -} - -func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { - return it.Iterator.At() + return &stopIterator{ + Iterator: c.Iterator(it), + i: -1, + stopAfter: stopAfter, } - s := it.buf[4-(it.total-it.i)] - return s.t, s.v } // stopIterator wraps an Iterator, but only returns the first diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 3a6266e22..cbea0a7b9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -767,16 +767,6 @@ func TestMemSeries_truncateChunks(t *testing.T) { chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.Equal(t, lastChunk, chk) - - // Validate that the series' sample buffer is applied correctly to the last chunk - // after truncation. - it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, &memChunkPool, nil) - _, ok := it1.(*memSafeIterator) - require.True(t, ok) - - it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, &memChunkPool, nil) - _, ok = it2.(*memSafeIterator) - require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { @@ -2486,7 +2476,7 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { } } -func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { +func TestIteratorSeekIntoBuffer(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) @@ -2504,11 +2494,9 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { } it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil) - _, ok := it.(*memSafeIterator) - require.True(t, ok) // First point. - ok = it.Seek(0) + ok := it.Seek(0) require.True(t, ok) ts, val := it.At() require.Equal(t, int64(0), ts) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index a69cd8019..0a5ee0599 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -778,7 +778,7 @@ type chunkSnapshotRecord struct { ref chunks.HeadSeriesRef lset labels.Labels mc *memChunk - sampleBuf [4]sample + lastValue float64 } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { @@ -798,11 +798,13 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutBE64int64(s.headChunk.maxTime) buf.PutByte(byte(s.headChunk.chunk.Encoding())) buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) - // Put the sample buf. - for _, smpl := range s.sampleBuf { - buf.PutBE64int64(smpl.t) - buf.PutBEFloat64(smpl.v) + // Backwards compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + buf.PutBE64int64(0) + buf.PutBEFloat64(0) } + buf.PutBE64int64(0) + buf.PutBEFloat64(s.lastValue) } s.Unlock() @@ -842,10 +844,13 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh } csr.mc.chunk = chk - for i := range csr.sampleBuf { - csr.sampleBuf[i].t = dec.Be64int64() - csr.sampleBuf[i].v = dec.Be64Float64() + // Backwards-compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + _ = dec.Be64int64() + _ = dec.Be64Float64() } + _ = dec.Be64int64() + csr.lastValue = dec.Be64Float64() err = dec.Err() if err != nil && len(dec.B) > 0 { @@ -1222,10 +1227,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunk = csr.mc - for i := range series.sampleBuf { - series.sampleBuf[i].t = csr.sampleBuf[i].t - series.sampleBuf[i].v = csr.sampleBuf[i].v - } + series.lastValue = csr.lastValue app, err := series.headChunk.chunk.Appender() if err != nil {