|
|
|
@ -1666,26 +1666,34 @@ func (h *Head) mmapHeadChunks() {
|
|
|
|
|
var count int |
|
|
|
|
for i := 0; i < h.series.size; i++ { |
|
|
|
|
h.series.locks[i].RLock() |
|
|
|
|
for _, all := range h.series.hashes[i] { |
|
|
|
|
for _, series := range all { |
|
|
|
|
for _, series := range h.series.series[i] { |
|
|
|
|
series.Lock() |
|
|
|
|
count += series.mmapChunks(h.chunkDiskMapper) |
|
|
|
|
series.Unlock() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
h.series.locks[i].RUnlock() |
|
|
|
|
} |
|
|
|
|
h.metrics.mmapChunksTotal.Add(float64(count)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
|
|
|
|
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
|
|
|
|
|
// seriesHashmap lets TSDB find a memSeries by its label set, via a 64-bit hash.
|
|
|
|
|
// There is one map for the common case where the hash value is unique, and a
|
|
|
|
|
// second map for the case that two series have the same hash value.
|
|
|
|
|
// Each series is in only one of the maps.
|
|
|
|
|
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
|
|
|
|
// the code.
|
|
|
|
|
type seriesHashmap map[uint64][]*memSeries |
|
|
|
|
type seriesHashmap struct { |
|
|
|
|
unique map[uint64]*memSeries |
|
|
|
|
conflicts map[uint64][]*memSeries |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { |
|
|
|
|
for _, s := range m[hash] { |
|
|
|
|
func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { |
|
|
|
|
if s, found := m.unique[hash]; found { |
|
|
|
|
if labels.Equal(s.lset, lset) { |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for _, s := range m.conflicts[hash] { |
|
|
|
|
if labels.Equal(s.lset, lset) { |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
@ -1694,27 +1702,49 @@ func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m seriesHashmap) set(hash uint64, s *memSeries) { |
|
|
|
|
l := m[hash] |
|
|
|
|
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { |
|
|
|
|
m.unique[hash] = s |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if m.conflicts == nil { |
|
|
|
|
m.conflicts = make(map[uint64][]*memSeries) |
|
|
|
|
} |
|
|
|
|
l := m.conflicts[hash] |
|
|
|
|
for i, prev := range l { |
|
|
|
|
if labels.Equal(prev.lset, s.lset) { |
|
|
|
|
l[i] = s |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
m[hash] = append(l, s) |
|
|
|
|
m.conflicts[hash] = append(l, s) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m seriesHashmap) del(hash uint64, lset labels.Labels) { |
|
|
|
|
var rem []*memSeries |
|
|
|
|
for _, s := range m[hash] { |
|
|
|
|
unique, found := m.unique[hash] |
|
|
|
|
switch { |
|
|
|
|
case !found: |
|
|
|
|
return |
|
|
|
|
case labels.Equal(unique.lset, lset): |
|
|
|
|
conflicts := m.conflicts[hash] |
|
|
|
|
if len(conflicts) == 0 { |
|
|
|
|
delete(m.unique, hash) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
rem = conflicts |
|
|
|
|
default: |
|
|
|
|
rem = append(rem, unique) |
|
|
|
|
for _, s := range m.conflicts[hash] { |
|
|
|
|
if !labels.Equal(s.lset, lset) { |
|
|
|
|
rem = append(rem, s) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if len(rem) == 0 { |
|
|
|
|
delete(m, hash) |
|
|
|
|
} |
|
|
|
|
m.unique[hash] = rem[0] |
|
|
|
|
if len(rem) == 1 { |
|
|
|
|
delete(m.conflicts, hash) |
|
|
|
|
} else { |
|
|
|
|
m[hash] = rem |
|
|
|
|
m.conflicts[hash] = rem[1:] |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1756,7 +1786,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|
|
|
|
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} |
|
|
|
|
} |
|
|
|
|
for i := range s.hashes { |
|
|
|
|
s.hashes[i] = seriesHashmap{} |
|
|
|
|
s.hashes[i] = seriesHashmap{ |
|
|
|
|
unique: map[uint64]*memSeries{}, |
|
|
|
|
conflicts: nil, // Initialized on demand in set().
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
@ -1776,15 +1809,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|
|
|
|
deletedFromPrevStripe = 0 |
|
|
|
|
) |
|
|
|
|
minMmapFile = math.MaxInt32 |
|
|
|
|
// Run through all series and truncate old chunks. Mark those with no
|
|
|
|
|
// chunks left as deleted and store their ID.
|
|
|
|
|
for i := 0; i < s.size; i++ { |
|
|
|
|
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) |
|
|
|
|
s.locks[i].Lock() |
|
|
|
|
|
|
|
|
|
for hash, all := range s.hashes[i] { |
|
|
|
|
for _, series := range all { |
|
|
|
|
// For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID.
|
|
|
|
|
check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { |
|
|
|
|
series.Lock() |
|
|
|
|
defer series.Unlock() |
|
|
|
|
|
|
|
|
|
rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef) |
|
|
|
|
|
|
|
|
|
if len(series.mmappedChunks) > 0 { |
|
|
|
@ -1815,31 +1845,36 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|
|
|
|
if seriesMint < actualMint { |
|
|
|
|
actualMint = seriesMint |
|
|
|
|
} |
|
|
|
|
series.Unlock() |
|
|
|
|
continue |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The series is gone entirely. We need to keep the series lock
|
|
|
|
|
// and make sure we have acquired the stripe locks for hash and ID of the
|
|
|
|
|
// series alike.
|
|
|
|
|
// If we don't hold them all, there's a very small chance that a series receives
|
|
|
|
|
// samples again while we are half-way into deleting it.
|
|
|
|
|
j := int(series.ref) & (s.size - 1) |
|
|
|
|
|
|
|
|
|
if i != j { |
|
|
|
|
s.locks[j].Lock() |
|
|
|
|
refShard := int(series.ref) & (s.size - 1) |
|
|
|
|
if hashShard != refShard { |
|
|
|
|
s.locks[refShard].Lock() |
|
|
|
|
defer s.locks[refShard].Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
deleted[storage.SeriesRef(series.ref)] = struct{}{} |
|
|
|
|
s.hashes[i].del(hash, series.lset) |
|
|
|
|
delete(s.series[j], series.ref) |
|
|
|
|
s.hashes[hashShard].del(hash, series.lset) |
|
|
|
|
delete(s.series[refShard], series.ref) |
|
|
|
|
deletedForCallback[series.ref] = series.lset |
|
|
|
|
|
|
|
|
|
if i != j { |
|
|
|
|
s.locks[j].Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
series.Unlock() |
|
|
|
|
// Run through all series shard by shard, checking which should be deleted.
|
|
|
|
|
for i := 0; i < s.size; i++ { |
|
|
|
|
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe) |
|
|
|
|
s.locks[i].Lock() |
|
|
|
|
|
|
|
|
|
for hash, series := range s.hashes[i].unique { |
|
|
|
|
check(i, hash, series, deletedForCallback) |
|
|
|
|
} |
|
|
|
|
for hash, all := range s.hashes[i].conflicts { |
|
|
|
|
for _, series := range all { |
|
|
|
|
check(i, hash, series, deletedForCallback) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|