From 071d5732afdb68fce428cb985ea13e176b9a876e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 27 Oct 2023 21:41:04 +0100 Subject: [PATCH 1/4] TSDB: refactor cleanup of chunks and series Extract the middle of the loop into a function, so it will be easier to modify the `seriesHashmap` data structure. Signed-off-by: Bryan Boreham --- tsdb/head.go | 117 ++++++++++++++++++++++++++------------------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index ee0ffcb8d..c46ffe061 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1777,70 +1777,73 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( deletedFromPrevStripe = 0 ) minMmapFile = math.MaxInt32 - // Run through all series and truncate old chunks. Mark those with no - // chunks left as deleted and store their ID. - for i := 0; i < s.size; i++ { - deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) - s.locks[i].Lock() - - for hash, all := range s.hashes[i] { - for _, series := range all { - series.Lock() - rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef) - if len(series.mmappedChunks) > 0 { - seq, _ := series.mmappedChunks[0].ref.Unpack() - if seq < minMmapFile { - minMmapFile = seq - } - } - if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 { - seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack() - if seq < minMmapFile { - minMmapFile = seq - } - for _, ch := range series.ooo.oooMmappedChunks { - if ch.minTime < minOOOTime { - minOOOTime = ch.minTime - } - } - } - if series.ooo != nil && series.ooo.oooHeadChunk != nil { - if series.ooo.oooHeadChunk.minTime < minOOOTime { - minOOOTime = series.ooo.oooHeadChunk.minTime - } - } - if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit || - (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { - seriesMint := series.minTime() - if seriesMint < actualMint { - actualMint = seriesMint - } - series.Unlock() - continue - } + // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. + check := func(i int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + series.Lock() + defer series.Unlock() - // The series is gone entirely. We need to keep the series lock - // and make sure we have acquired the stripe locks for hash and ID of the - // series alike. - // If we don't hold them all, there's a very small chance that a series receives - // samples again while we are half-way into deleting it. - j := int(series.ref) & (s.size - 1) + rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef) - if i != j { - s.locks[j].Lock() + if len(series.mmappedChunks) > 0 { + seq, _ := series.mmappedChunks[0].ref.Unpack() + if seq < minMmapFile { + minMmapFile = seq + } + } + if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 { + seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack() + if seq < minMmapFile { + minMmapFile = seq + } + for _, ch := range series.ooo.oooMmappedChunks { + if ch.minTime < minOOOTime { + minOOOTime = ch.minTime } + } + } + if series.ooo != nil && series.ooo.oooHeadChunk != nil { + if series.ooo.oooHeadChunk.minTime < minOOOTime { + minOOOTime = series.ooo.oooHeadChunk.minTime + } + } + if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit || + (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { + seriesMint := series.minTime() + if seriesMint < actualMint { + actualMint = seriesMint + } + return + } + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + j := int(series.ref) & (s.size - 1) + + if i != j { + s.locks[j].Lock() + } - deleted[storage.SeriesRef(series.ref)] = struct{}{} - s.hashes[i].del(hash, series.lset) - delete(s.series[j], series.ref) - deletedForCallback[series.ref] = series.lset + deleted[storage.SeriesRef(series.ref)] = struct{}{} + s.hashes[i].del(hash, series.lset) + delete(s.series[j], series.ref) + deletedForCallback[series.ref] = series.lset - if i != j { - s.locks[j].Unlock() - } + if i != j { + s.locks[j].Unlock() + } + } - series.Unlock() + // Run through all series shard by shard, checking which should be deleted. + for i := 0; i < s.size; i++ { + deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) + s.locks[i].Lock() + + for hash, all := range s.hashes[i] { + for _, series := range all { + check(i, hash, series, deletedForCallback) } } From ce4e757704866ad16bd456e1140cdfe45b380601 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 27 Oct 2023 22:02:34 +0100 Subject: [PATCH 2/4] TSDB: refine variable naming in chunk gc Slight further refactor. Signed-off-by: Bryan Boreham --- tsdb/head.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index c46ffe061..575177a7a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1779,7 +1779,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( minMmapFile = math.MaxInt32 // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. - check := func(i int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { series.Lock() defer series.Unlock() @@ -1820,20 +1820,16 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( // series alike. // If we don't hold them all, there's a very small chance that a series receives // samples again while we are half-way into deleting it. - j := int(series.ref) & (s.size - 1) - - if i != j { - s.locks[j].Lock() + refShard := int(series.ref) & (s.size - 1) + if hashShard != refShard { + s.locks[refShard].Lock() + defer s.locks[refShard].Unlock() } deleted[storage.SeriesRef(series.ref)] = struct{}{} - s.hashes[i].del(hash, series.lset) - delete(s.series[j], series.ref) + s.hashes[hashShard].del(hash, series.lset) + delete(s.series[refShard], series.ref) deletedForCallback[series.ref] = series.lset - - if i != j { - s.locks[j].Unlock() - } } // Run through all series shard by shard, checking which should be deleted. From e6c0f69f98f1125fdb9f4db8fc6f5d13629fad28 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 27 Oct 2023 22:05:30 +0100 Subject: [PATCH 3/4] TSDB: Only pay for hash collisions when they happen Instead of a map of slices of `*memSeries`, ready for any of them to hold series where hash values collide, split into a map of `*memSeries` and a map of slices which is usually empty, since hash collisions are a one-in-a-billion thing. The `del` method gets more complicated, to maintain the invariant that a series is only in one of the two maps. Signed-off-by: Bryan Boreham --- tsdb/head.go | 75 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 575177a7a..410a226d8 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1667,26 +1667,34 @@ func (h *Head) mmapHeadChunks() { var count int for i := 0; i < h.series.size; i++ { h.series.locks[i].RLock() - for _, all := range h.series.hashes[i] { - for _, series := range all { - series.Lock() - count += series.mmapChunks(h.chunkDiskMapper) - series.Unlock() - } + for _, series := range h.series.series[i] { + series.Lock() + count += series.mmapChunks(h.chunkDiskMapper) + series.Unlock() } h.series.locks[i].RUnlock() } h.metrics.mmapChunksTotal.Add(float64(count)) } -// seriesHashmap is a simple hashmap for memSeries by their label set. It is built -// on top of a regular hashmap and holds a slice of series to resolve hash collisions. +// seriesHashmap lets TSDB find a memSeries by its label set, via a 64-bit hash. +// There is one map for the common case where the hash value is unique, and a +// second map for the case that two series have the same hash value. +// Each series is in only one of the maps. // Its methods require the hash to be submitted with it to avoid re-computations throughout // the code. -type seriesHashmap map[uint64][]*memSeries +type seriesHashmap struct { + unique map[uint64]*memSeries + conflicts map[uint64][]*memSeries +} -func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { - for _, s := range m[hash] { +func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { + if s, found := m.unique[hash]; found { + if labels.Equal(s.lset, lset) { + return s + } + } + for _, s := range m.conflicts[hash] { if labels.Equal(s.lset, lset) { return s } @@ -1695,27 +1703,46 @@ func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { } func (m seriesHashmap) set(hash uint64, s *memSeries) { - l := m[hash] + if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { + m.unique[hash] = s + return + } + l := m.conflicts[hash] for i, prev := range l { if labels.Equal(prev.lset, s.lset) { l[i] = s return } } - m[hash] = append(l, s) + m.conflicts[hash] = append(l, s) } func (m seriesHashmap) del(hash uint64, lset labels.Labels) { var rem []*memSeries - for _, s := range m[hash] { - if !labels.Equal(s.lset, lset) { - rem = append(rem, s) + unique, found := m.unique[hash] + switch { + case !found: + return + case labels.Equal(unique.lset, lset): + conflicts := m.conflicts[hash] + if len(conflicts) == 0 { + delete(m.unique, hash) + return + } + rem = conflicts + default: + rem = append(rem, unique) + for _, s := range m.conflicts[hash] { + if !labels.Equal(s.lset, lset) { + rem = append(rem, s) + } } } - if len(rem) == 0 { - delete(m, hash) + m.unique[hash] = rem[0] + if len(rem) == 1 { + delete(m.conflicts, hash) } else { - m[hash] = rem + m.conflicts[hash] = rem[1:] } } @@ -1757,7 +1784,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} } for i := range s.hashes { - s.hashes[i] = seriesHashmap{} + s.hashes[i] = seriesHashmap{ + unique: map[uint64]*memSeries{}, + conflicts: map[uint64][]*memSeries{}, + } } return s } @@ -1837,7 +1867,10 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) s.locks[i].Lock() - for hash, all := range s.hashes[i] { + for hash, series := range s.hashes[i].unique { + check(i, hash, series, deletedForCallback) + } + for hash, all := range s.hashes[i].conflicts { for _, series := range all { check(i, hash, series, deletedForCallback) } From 65a443e6e370c9996506fb3152db678966e7154d Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 13 Nov 2023 16:20:04 +0000 Subject: [PATCH 4/4] TSDB: initialize conflicts map only when we need it. Suggested by @songjiayang. Signed-off-by: Bryan Boreham --- tsdb/head.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tsdb/head.go b/tsdb/head.go index 410a226d8..fb90c9fa0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1707,6 +1707,9 @@ func (m seriesHashmap) set(hash uint64, s *memSeries) { m.unique[hash] = s return } + if m.conflicts == nil { + m.conflicts = make(map[uint64][]*memSeries) + } l := m.conflicts[hash] for i, prev := range l { if labels.Equal(prev.lset, s.lset) { @@ -1786,7 +1789,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st for i := range s.hashes { s.hashes[i] = seriesHashmap{ unique: map[uint64]*memSeries{}, - conflicts: map[uint64][]*memSeries{}, + conflicts: nil, // Initialized on demand in set(). } } return s