Browse Source

Fixed race in Chunks method. (#6515)

Added regression test.

Fixes #6512

Before (not deterministic result due to concurrency):
```
=== RUN   TestChunkReader_ConcurrentRead
--- FAIL: TestChunkReader_ConcurrentRead (0.01s)
    db_test.go:2702: unexpected error: checksum mismatch expected:597ad276, actual:597ad276
    db_test.go:2702: unexpected error: checksum mismatch expected:dd0cdbc2, actual:dd0cdbc2
FAIL
```

After succuess on multiple runs.


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/6518/head
Bartlomiej Plotka 5 years ago
parent
commit
c2e083c30f
  1. 21
      tsdb/chunks/chunks.go
  2. 60
      tsdb/db_test.go

21
tsdb/chunks/chunks.go

@ -456,16 +456,14 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
type Reader struct {
// The underlying bytes holding the encoded series data.
// Each slice holds the data for a different segment.
bs []ByteSlice
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
bs []ByteSlice
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()}
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs {
@ -541,6 +539,7 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
// Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts.
chkStart = int((ref << 32) >> 32)
chkCRC32 = newCRC32()
)
if sgmIndex >= len(s.bs) {
@ -569,12 +568,12 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
}
sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd)
s.crc32.Reset()
if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
sum := sgmBytes.Range(chkDataEnd, chkEnd)
if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
return nil, err
}
if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) {
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
}

60
tsdb/db_test.go

@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"strconv"
"sync"
"testing"
"time"
@ -2485,9 +2486,9 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Equals(t, 1000.0, sum)
}
// TestChunkWriter ensures that chunk segment are cut at the set segment size and
// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data.
func TestChunkWriter(t *testing.T) {
func TestChunkWriter_ReadAfterWrite(t *testing.T) {
chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
@ -2611,22 +2612,19 @@ func TestChunkWriter(t *testing.T) {
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_chunk_witer")
tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize))
chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
testutil.Ok(t, err)
for _, chks := range test.chks {
chunkw.WriteChunks(chks...)
testutil.Ok(t, chunkw.WriteChunks(chks...))
}
testutil.Ok(t, chunkw.Close())
files, err := ioutil.ReadDir(tmpdir)
files, err := ioutil.ReadDir(tempDir)
testutil.Ok(t, err)
testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
@ -2655,7 +2653,7 @@ func TestChunkWriter(t *testing.T) {
testutil.Equals(t, sizeExp, sizeAct)
// Check the content of the chunks.
r, err := chunks.NewDirReader(tmpdir, nil)
r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err)
for _, chks := range test.chks {
@ -2668,3 +2666,43 @@ func TestChunkWriter(t *testing.T) {
})
}
}
// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently.
// Regression test for https://github.com/prometheus/prometheus/pull/6514.
func TestChunkReader_ConcurrentReads(t *testing.T) {
chks := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}),
}
tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
chunkw, err := chunks.NewWriter(tempDir)
testutil.Ok(t, err)
testutil.Ok(t, chunkw.WriteChunks(chks...))
testutil.Ok(t, chunkw.Close())
r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err)
var wg sync.WaitGroup
for _, chk := range chks {
for i := 0; i < 100; i++ {
wg.Add(1)
go func(chunk chunks.Meta) {
defer wg.Done()
chkAct, err := r.Chunk(chunk.Ref)
testutil.Ok(t, err)
testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes())
}(chk)
}
wg.Wait()
}
}

Loading…
Cancel
Save