Do not re-encode head chunk in ChunkQuerier

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/11992/head
Ganesh Vernekar 2 years ago
parent 66da1d51fd
commit 0c0c2af7f5
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34

@ -6334,3 +6334,76 @@ func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sa
}
}
}
// TestChunkQuerierReadWriteRace looks for any possible race between appending
// samples and reading chunks because the head chunk that is being appended to
// can be read in parallel and we should be able to make a copy of the chunk without
// worrying about the parallel write.
func TestChunkQuerierReadWriteRace(t *testing.T) {
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
lbls := labels.FromStrings("foo", "bar")
writer := func() error {
<-time.After(5 * time.Millisecond) // Initial pause while readers start.
ts := 0
for i := 0; i < 500; i++ {
app := db.Appender(context.Background())
for j := 0; j < 10; j++ {
ts++
_, err := app.Append(0, lbls, int64(ts), float64(ts*100))
if err != nil {
return err
}
}
err := app.Commit()
if err != nil {
return err
}
<-time.After(time.Millisecond)
}
return nil
}
reader := func() {
querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer func(q storage.ChunkQuerier) {
require.NoError(t, q.Close())
}(querier)
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
for ss.Next() {
cs := ss.At()
it := cs.Iterator(nil)
for it.Next() {
m := it.At()
b := m.Chunk.Bytes()
bb := make([]byte, len(b))
copy(bb, b) // This copying of chunk bytes detects any race.
}
}
require.NoError(t, ss.Err())
}
ch := make(chan struct{})
var writerErr error
go func() {
defer close(ch)
writerErr = writer()
}()
Outer:
for {
reader()
select {
case <-ch:
break Outer
default:
}
}
require.NoError(t, writerErr)
}

@ -274,22 +274,36 @@ func (h *headChunkReader) Close() error {
// Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
chk, _, err := h.chunk(meta, false)
return chk, err
}
// ChunkWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk.
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) {
return h.chunk(meta, true)
}
// chunk returns the chunk for the reference number.
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it.
// Also returns max time of the chunk.
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, storage.ErrNotFound
return nil, 0, storage.ErrNotFound
}
s.Lock()
c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
c, headChunk, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
if err != nil {
s.Unlock()
return nil, err
return nil, 0, err
}
defer func() {
if garbageCollect {
if !headChunk {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
h.head.memChunkPool.Put(c)
@ -299,22 +313,36 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, storage.ErrNotFound
return nil, 0, storage.ErrNotFound
}
chk, maxTime := c.chunk, c.maxTime
if headChunk && copyLastChunk {
// The caller may ask to copy the head chunk in order to take the
// bytes of the chunk without causing the race between read and append.
b := s.headChunk.chunk.Bytes()
newB := make([]byte, len(b))
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20.
// TODO(codesome): Put back in the pool (non-trivial).
chk, err = h.head.opts.ChunkPool.Get(s.headChunk.chunk.Encoding(), newB)
if err != nil {
return nil, 0, err
}
}
s.Unlock()
return &safeChunk{
Chunk: c.chunk,
Chunk: chk,
s: s,
cid: cid,
isoState: h.isoState,
}, nil
}, maxTime, nil
}
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// If headChunk is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, garbageCollect bool, err error) {
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
@ -323,11 +351,12 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
if ix < 0 || ix > len(s.mmappedChunks) {
return nil, false, storage.ErrNotFound
}
if ix == len(s.mmappedChunks) {
if s.headChunk == nil {
return nil, false, errors.New("invalid head chunk")
}
return s.headChunk, false, nil
return s.headChunk, true, nil
}
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
@ -340,7 +369,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, true, nil
return mc, false, nil
}
// oooMergedChunk returns the requested chunk based on the given chunks.Meta

@ -584,7 +584,11 @@ func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr Chunk
p.currChkMeta = chunks.Meta{}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
// If copyHeadChunk is true, then the head chunk (i.e. the in-memory chunk of the TSDB)
// is deep copied to avoid races between reads and copying chunk bytes.
// However, if the deletion intervals overlaps with the head chunk, then the head chunk is
// not copied irrespective of copyHeadChunk because it will be re-encoded later anyway.
func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
if p.err != nil || p.i >= len(p.chks)-1 {
return false
}
@ -592,12 +596,6 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
p.i++
p.currChkMeta = p.chks[p.i]
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String())
return false
}
p.bufIter.Intervals = p.bufIter.Intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
@ -605,22 +603,28 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
}
}
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO(codesome): think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there is no overlap with deletion intervals AND it's NOT
// an "open" head chunk, we can take chunk as it is.
hcr, ok := p.chunks.(*headChunkReader)
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
// ChunkWithCopy will copy the head chunk.
var maxt int64
p.currChkMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currChkMeta)
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
p.currChkMeta.MaxTime = maxt
} else {
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta)
}
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String())
return false
}
if len(p.bufIter.Intervals) == 0 {
// If there is no overlap with deletion intervals, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want the full chunk, or it's potentially still opened, take
// just a part of it.
// We don't want the full chunk, take just a part of it.
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter)
p.currDelIter = &p.bufIter
return true
@ -677,7 +681,7 @@ func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType {
}
}
for p.next() {
for p.next(false) {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
@ -742,7 +746,7 @@ func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkRe
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
if !p.next(true) {
return false
}
p.curr = p.currChkMeta

@ -0,0 +1 @@
make: Nothing to be done for `test'.
Loading…
Cancel
Save