Browse Source

tsdb: remove chunk pool from memSeries (#11280)

The chunk pool belongs to the head not to the series. Pass it down where
required, and remove the copy of the pointer that `memSeries` was
holding.

`safeChunk` also needs to hold it, because in scenarios where it is used
we don't have a reference to the head. However it was already holding
`chunkDiskMapper` for the same reason, so no big change.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/11313/head
Bryan Boreham 2 years ago committed by GitHub
parent
commit
d2701be53a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      tsdb/head.go
  2. 19
      tsdb/head_read.go
  3. 30
      tsdb/head_test.go

15
tsdb/head.go

@ -1271,7 +1271,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool, h.opts.IsolationDisabled)
return newMemSeries(lset, id, h.chunkRange.Load(), h.opts.IsolationDisabled)
})
if err != nil {
return nil, false, err
@ -1545,21 +1545,18 @@ type memSeries struct {
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
app chunkenc.Appender
memChunkPool *sync.Pool
// txs is nil if isolation is disabled.
txs *txRing
pendingCommit bool // Whether there are samples waiting to be committed to this series.
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool, isolationDisabled bool) *memSeries {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, isolationDisabled bool) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
memChunkPool: memChunkPool,
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
}
if !isolationDisabled {
s.txs = newTxRing(4)

19
tsdb/head_read.go

@ -17,6 +17,7 @@ import (
"context"
"math"
"sort"
"sync"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -267,7 +268,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
}
s.Lock()
c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper)
c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
if err != nil {
s.Unlock()
return nil, err
@ -276,7 +277,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
s.memChunkPool.Put(c)
h.head.memChunkPool.Put(c)
}
}()
@ -293,13 +294,14 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
cid: cid,
isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
memChunkPool: &h.head.memChunkPool,
}, nil
}
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
@ -321,7 +323,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
}
return nil, false, err
}
mc := s.memChunkPool.Get().(*memChunk)
mc := memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
@ -334,19 +336,20 @@ type safeChunk struct {
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
memChunkPool *sync.Pool
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter)
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter)
c.s.Unlock()
return it
}
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
@ -359,7 +362,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
// Set this to nil so that Go GC can collect it after it has been used.
// This should be done always at the end.
c.chunk = nil
s.memChunkPool.Put(c)
memChunkPool.Put(c)
}
}()

30
tsdb/head_test.go

@ -224,7 +224,7 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
@ -515,11 +515,11 @@ func TestHead_ReadWAL(t *testing.T) {
require.NoError(t, c.Err())
return x
}
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
@ -733,7 +733,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
},
}
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool, defaultIsolationDisabled)
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
@ -744,31 +744,31 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// that the ID of the last chunk still gives us the same chunk afterwards.
countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
lastID := s.headChunkID(countBefore - 1)
lastChunk, _, err := s.chunk(lastID, chunkDiskMapper)
lastChunk, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool)
require.NoError(t, err)
require.NotNil(t, lastChunk)
chk, _, err := s.chunk(0, chunkDiskMapper)
chk, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool)
require.NotNil(t, chk)
require.NoError(t, err)
s.truncateChunksBefore(2000)
require.Equal(t, int64(2000), s.mmappedChunks[0].minTime)
_, _, err = s.chunk(0, chunkDiskMapper)
_, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool)
require.Equal(t, storage.ErrNotFound, err, "first chunks not gone")
require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk.
chk, _, err = s.chunk(lastID, chunkDiskMapper)
chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool)
require.NoError(t, err)
require.Equal(t, lastChunk, chk)
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, &memChunkPool, nil)
_, ok := it1.(*memSafeIterator)
require.True(t, ok)
it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil)
it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, &memChunkPool, nil)
_, ok = it2.(*memSafeIterator)
require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer")
}
@ -1271,7 +1271,7 @@ func TestMemSeries_append(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close())
}()
s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, 500, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
@ -1325,7 +1325,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close())
})
s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, defaultIsolationDisabled)
// At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -2483,14 +2483,14 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close())
}()
s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, 500, defaultIsolationDisabled)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
require.True(t, ok, "sample append failed")
}
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil)
_, ok := it.(*memSafeIterator)
require.True(t, ok)

Loading…
Cancel
Save