mirror of https://github.com/prometheus/prometheus
Merge pull request #14525 from colega/merge-mmmaxtime-into-shardhash
Store `mmMaxTime` in same field as `seriesShard`pull/14538/head
commit
6816149852
40
tsdb/head.go
40
tsdb/head.go
|
@ -178,6 +178,7 @@ type HeadOptions struct {
|
||||||
WALReplayConcurrency int
|
WALReplayConcurrency int
|
||||||
|
|
||||||
// EnableSharding enables ShardedPostings() support in the Head.
|
// EnableSharding enables ShardedPostings() support in the Head.
|
||||||
|
// EnableSharding is temporarily disabled during Init().
|
||||||
EnableSharding bool
|
EnableSharding bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -609,7 +610,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
|
||||||
// Init loads data from the write ahead log and prepares the head for writes.
|
// Init loads data from the write ahead log and prepares the head for writes.
|
||||||
// It should be called before using an appender so that it
|
// It should be called before using an appender so that it
|
||||||
// limits the ingested samples to the head min valid time.
|
// limits the ingested samples to the head min valid time.
|
||||||
func (h *Head) Init(minValidTime int64) error {
|
func (h *Head) Init(minValidTime int64) (err error) {
|
||||||
h.minValidTime.Store(minValidTime)
|
h.minValidTime.Store(minValidTime)
|
||||||
defer func() {
|
defer func() {
|
||||||
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
|
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
|
||||||
|
@ -623,6 +624,24 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// If sharding is enabled, disable it while initializing, and calculate the shards later.
|
||||||
|
// We're going to use that field for other purposes during WAL replay,
|
||||||
|
// so we don't want to waste time on calculating the shard that we're going to lose anyway.
|
||||||
|
if h.opts.EnableSharding {
|
||||||
|
h.opts.EnableSharding = false
|
||||||
|
defer func() {
|
||||||
|
h.opts.EnableSharding = true
|
||||||
|
if err == nil {
|
||||||
|
// No locking is needed here as nobody should be writing while we're in Init.
|
||||||
|
for _, stripe := range h.series.series {
|
||||||
|
for _, s := range stripe {
|
||||||
|
s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
|
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
@ -683,7 +702,6 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
||||||
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
|
||||||
lastMmapRef chunks.ChunkDiskMapperRef
|
lastMmapRef chunks.ChunkDiskMapperRef
|
||||||
err error
|
|
||||||
|
|
||||||
mmapChunkReplayDuration time.Duration
|
mmapChunkReplayDuration time.Duration
|
||||||
)
|
)
|
||||||
|
@ -2068,9 +2086,11 @@ type memSeries struct {
|
||||||
ref chunks.HeadSeriesRef
|
ref chunks.HeadSeriesRef
|
||||||
meta *metadata.Metadata
|
meta *metadata.Metadata
|
||||||
|
|
||||||
// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
|
// Series labels hash to use for sharding purposes.
|
||||||
// been explicitly enabled in TSDB.
|
// The value is always 0 when sharding has not been explicitly enabled in TSDB.
|
||||||
shardHash uint64
|
// While the WAL replay the value stored here is the max time of any mmapped chunk,
|
||||||
|
// and the shard hash is re-calculated after WAL replay is complete.
|
||||||
|
shardHashOrMemoryMappedMaxTime uint64
|
||||||
|
|
||||||
// Everything after here should only be accessed with the lock held.
|
// Everything after here should only be accessed with the lock held.
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
@ -2095,8 +2115,6 @@ type memSeries struct {
|
||||||
|
|
||||||
ooo *memSeriesOOOFields
|
ooo *memSeriesOOOFields
|
||||||
|
|
||||||
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
|
|
||||||
|
|
||||||
nextAt int64 // Timestamp at which to cut the next chunk.
|
nextAt int64 // Timestamp at which to cut the next chunk.
|
||||||
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
|
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
|
||||||
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||||||
|
@ -2130,7 +2148,7 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64,
|
||||||
lset: lset,
|
lset: lset,
|
||||||
ref: id,
|
ref: id,
|
||||||
nextAt: math.MinInt64,
|
nextAt: math.MinInt64,
|
||||||
shardHash: shardHash,
|
shardHashOrMemoryMappedMaxTime: shardHash,
|
||||||
}
|
}
|
||||||
if !isolationDisabled {
|
if !isolationDisabled {
|
||||||
s.txs = newTxRing(0)
|
s.txs = newTxRing(0)
|
||||||
|
@ -2218,6 +2236,12 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
|
||||||
return removedInOrder + removedOOO
|
return removedInOrder + removedOOO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shardHash returns the shard hash of the series, only available after WAL replay.
|
||||||
|
func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime }
|
||||||
|
|
||||||
|
// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay.
|
||||||
|
func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) }
|
||||||
|
|
||||||
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
||||||
// acquiring lock.
|
// acquiring lock.
|
||||||
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
||||||
|
|
|
@ -170,7 +170,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the series belong to the shard.
|
// Check if the series belong to the shard.
|
||||||
if s.shardHash%shardCount != shardIndex {
|
if s.shardHash()%shardCount != shardIndex {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"runtime/pprof"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -89,6 +90,43 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts
|
||||||
return h, wal
|
return h, wal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
|
||||||
|
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
|
||||||
|
// Optionally, BENCHMARK_LOAD_REAL_WLS_PROFILE can be set to a file path to write a CPU profile.
|
||||||
|
func BenchmarkLoadRealWLs(b *testing.B) {
|
||||||
|
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
|
||||||
|
if dir == "" {
|
||||||
|
b.Skipped()
|
||||||
|
}
|
||||||
|
|
||||||
|
profileFile := os.Getenv("BENCHMARK_LOAD_REAL_WLS_PROFILE")
|
||||||
|
if profileFile != "" {
|
||||||
|
b.Logf("Will profile in %s", profileFile)
|
||||||
|
f, err := os.Create(profileFile)
|
||||||
|
require.NoError(b, err)
|
||||||
|
b.Cleanup(func() { f.Close() })
|
||||||
|
require.NoError(b, pprof.StartCPUProfile(f))
|
||||||
|
b.Cleanup(pprof.StopCPUProfile)
|
||||||
|
}
|
||||||
|
|
||||||
|
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
|
||||||
|
require.NoError(b, err)
|
||||||
|
b.Cleanup(func() { wal.Close() })
|
||||||
|
|
||||||
|
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
|
||||||
|
require.NoError(b, err)
|
||||||
|
b.Cleanup(func() { wbl.Close() })
|
||||||
|
|
||||||
|
// Load the WAL.
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
opts := DefaultHeadOptions()
|
||||||
|
opts.ChunkDirRoot = dir
|
||||||
|
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||||
|
require.NoError(b, err)
|
||||||
|
h.Init(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkCreateSeries(b *testing.B) {
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
series := genSeries(b.N, 10, 0, 0)
|
series := genSeries(b.N, 10, 0, 0)
|
||||||
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
|
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
|
||||||
|
|
|
@ -435,6 +435,8 @@ Outer:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func minInt64() int64 { return math.MinInt64 }
|
||||||
|
|
||||||
// resetSeriesWithMMappedChunks is only used during the WAL replay.
|
// resetSeriesWithMMappedChunks is only used during the WAL replay.
|
||||||
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
|
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
|
||||||
if mSeries.ref != walSeriesRef {
|
if mSeries.ref != walSeriesRef {
|
||||||
|
@ -481,10 +483,11 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
|
||||||
}
|
}
|
||||||
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
|
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
|
||||||
if len(mmc) == 0 {
|
if len(mmc) == 0 {
|
||||||
mSeries.mmMaxTime = math.MinInt64
|
mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64())
|
||||||
} else {
|
} else {
|
||||||
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
|
mmMaxTime := mmc[len(mmc)-1].maxTime
|
||||||
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
|
mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime)
|
||||||
|
h.updateMinMaxTime(mmc[0].minTime, mmMaxTime)
|
||||||
}
|
}
|
||||||
if len(oooMmc) != 0 {
|
if len(oooMmc) != 0 {
|
||||||
// Mint and maxt can be in any chunk, they are not sorted.
|
// Mint and maxt can be in any chunk, they are not sorted.
|
||||||
|
@ -585,7 +588,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
unknownRefs++
|
unknownRefs++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.T <= ms.mmMaxTime {
|
if s.T <= ms.mmMaxTime() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
||||||
|
@ -614,7 +617,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
unknownHistogramRefs++
|
unknownHistogramRefs++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.t <= ms.mmMaxTime {
|
if s.t <= ms.mmMaxTime() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var chunkCreated bool
|
var chunkCreated bool
|
||||||
|
|
Loading…
Reference in New Issue