diff --git a/tsdb/head.go b/tsdb/head.go index fb7ff582d..32e85c599 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1271,7 +1271,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool, h.opts.IsolationDisabled) + return newMemSeries(lset, id, h.chunkRange.Load(), h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1545,21 +1545,18 @@ type memSeries struct { // (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series). app chunkenc.Appender - memChunkPool *sync.Pool - // txs is nil if isolation is disabled. txs *txRing pendingCommit bool // Whether there are samples waiting to be committed to this series. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, - memChunkPool: memChunkPool, + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index cd19c0ef5..ca34b9bbd 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -17,6 +17,7 @@ import ( "context" "math" "sort" + "sync" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -267,7 +268,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { } s.Lock() - c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper) + c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { s.Unlock() return nil, err @@ -276,7 +277,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { if garbageCollect { // Set this to nil so that Go GC can collect it after it has been used. c.chunk = nil - s.memChunkPool.Put(c) + h.head.memChunkPool.Put(c) } }() @@ -293,13 +294,14 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { cid: cid, isoState: h.isoState, chunkDiskMapper: h.head.chunkDiskMapper, + memChunkPool: &h.head.memChunkPool, }, nil } // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -321,7 +323,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi } return nil, false, err } - mc := s.memChunkPool.Get().(*memChunk) + mc := memChunkPool.Get().(*memChunk) mc.chunk = chk mc.minTime = s.mmappedChunks[ix].minTime mc.maxTime = s.mmappedChunks[ix].maxTime @@ -334,19 +336,20 @@ type safeChunk struct { cid chunks.HeadChunkID isoState *isolationState chunkDiskMapper *chunks.ChunkDiskMapper + memChunkPool *sync.Pool } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) + it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter) c.s.Unlock() return it } // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { - c, garbageCollect, err := s.chunk(id, chunkDiskMapper) +func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator { + c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got // accessed. We must ensure to not garbage collect as long as any @@ -359,7 +362,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch // Set this to nil so that Go GC can collect it after it has been used. // This should be done always at the end. c.chunk = nil - s.memChunkPool.Put(c) + memChunkPool.Put(c) } }() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6c354e532..7c580406c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -224,7 +224,7 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper) s.mmapCurrentHeadChunk(chunkDiskMapper) } @@ -515,11 +515,11 @@ func TestHead_ReadWAL(t *testing.T) { require.NoError(t, c.Err()) return x } - require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) - require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) + require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil))) + require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil))) // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) + require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil))) q, err := head.ExemplarQuerier(context.Background()) require.NoError(t, err) @@ -733,7 +733,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) @@ -744,31 +744,31 @@ func TestMemSeries_truncateChunks(t *testing.T) { // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.headChunkID(countBefore - 1) - lastChunk, _, err := s.chunk(lastID, chunkDiskMapper) + lastChunk, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.NotNil(t, lastChunk) - chk, _, err := s.chunk(0, chunkDiskMapper) + chk, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool) require.NotNil(t, chk) require.NoError(t, err) s.truncateChunksBefore(2000) require.Equal(t, int64(2000), s.mmappedChunks[0].minTime) - _, _, err = s.chunk(0, chunkDiskMapper) + _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool) require.Equal(t, storage.ErrNotFound, err, "first chunks not gone") require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. - chk, _, err = s.chunk(lastID, chunkDiskMapper) + chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.Equal(t, lastChunk, chk) // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, &memChunkPool, nil) _, ok := it1.(*memSafeIterator) require.True(t, ok) - it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) + it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, &memChunkPool, nil) _, ok = it2.(*memSafeIterator) require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") } @@ -1271,7 +1271,7 @@ func TestMemSeries_append(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 500, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1325,7 +1325,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }) - s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2483,14 +2483,14 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 500, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) require.True(t, ok, "sample append failed") } - it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil) _, ok := it.(*memSafeIterator) require.True(t, ok)