From df6eed2358db69b21ca81b9e46dddc19d5bfe034 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Tue, 3 Nov 2020 11:04:59 +0100 Subject: [PATCH 1/4] Calculate head chunk size based on actual disk usage (#8139) Cherry-picks 8bc369bf9bf67cf8389d31fd8453be772cd8f788 Signed-off-by: Julien Pivotto --- tsdb/chunks/head_chunks.go | 14 ++------------ tsdb/db.go | 2 +- tsdb/db_test.go | 30 +++++++++++++++++++++++++----- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 632682218..08c6f1b3c 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -104,10 +104,6 @@ type ChunkDiskMapper struct { // from which chunks are served till they are flushed and are ready for m-mapping. chunkBuffer *chunkBuffer - // The total size of bytes in the closed files. - // Needed to calculate the total size of all segments on disk. - size atomic.Int64 - // If 'true', it indicated that the maxt of all the on-disk files were set // after iterating through all the chunks in those files. fileMaxtSet bool @@ -178,8 +174,6 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { chkFileIndices = append(chkFileIndices, seq) } - cdm.size.Store(int64(0)) - // Check for gaps in the files. sort.Ints(chkFileIndices) if len(chkFileIndices) == 0 { @@ -206,8 +200,6 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { return errors.Errorf("%s: invalid chunk format version %d", files[i], v) } - - cdm.size.Add(int64(b.byteSlice.Len())) } return nil @@ -340,7 +332,6 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { } }() - cdm.size.Add(cdm.curFileSize()) cdm.curFileNumBytes.Store(int64(n)) if cdm.curFile != nil { @@ -696,7 +687,6 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { cdm.readPathMtx.Unlock() return err } - cdm.size.Sub(int64(cdm.mmappedChunkFiles[seq].byteSlice.Len())) delete(cdm.mmappedChunkFiles, seq) delete(cdm.closers, seq) } @@ -735,8 +725,8 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { } // Size returns the size of the chunk files. -func (cdm *ChunkDiskMapper) Size() int64 { - return cdm.size.Load() + cdm.curFileSize() +func (cdm *ChunkDiskMapper) Size() (int64, error) { + return fileutil.DirSize(cdm.dir.Name()) } func (cdm *ChunkDiskMapper) curFileSize() int64 { diff --git a/tsdb/db.go b/tsdb/db.go index b045fa155..da1304356 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1104,7 +1104,7 @@ func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc deletable = make(map[ulid.ULID]struct{}) walSize, _ := db.Head().wal.Size() - headChunksSize := db.Head().chunkDiskMapper.Size() + headChunksSize, _ := db.Head().chunkDiskMapper.Size() // Initializing size counter with WAL size and Head chunks // written to disk, as that is part of the retention strategy. blocksSize := walSize + headChunksSize diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e1969c512..fd5ddbdf7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1235,7 +1235,7 @@ func TestSizeRetention(t *testing.T) { // Add some data to the WAL. headApp := db.Head().Appender(context.Background()) for _, m := range headBlocks { - series := genSeries(100, 10, m.MinTime, m.MaxTime) + series := genSeries(100, 10, m.MinTime, m.MaxTime+1) for _, s := range series { it := s.Iterator() for it.Next() { @@ -1254,8 +1254,12 @@ func TestSizeRetention(t *testing.T) { blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err := db.Head().wal.Size() testutil.Ok(t, err) - // Expected size should take into account block size + WAL size - expSize := blockSize + walSize + cdmSize, err := db.Head().chunkDiskMapper.Size() + testutil.Ok(t, err) + testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize) + // Expected size should take into account block size + WAL size + Head + // chunks size + expSize := blockSize + walSize + cdmSize actSize, err := fileutil.DirSize(db.Dir()) testutil.Ok(t, err) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") @@ -1268,7 +1272,20 @@ func TestSizeRetention(t *testing.T) { blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() testutil.Ok(t, err) - expSize = blockSize + walSize + cdmSize, err = db.Head().chunkDiskMapper.Size() + testutil.Ok(t, err) + testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize) + expSize = blockSize + walSize + cdmSize + actSize, err = fileutil.DirSize(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Truncate Chunk Disk Mapper and compare sizes. + testutil.Ok(t, db.Head().chunkDiskMapper.Truncate(900)) + cdmSize, err = db.Head().chunkDiskMapper.Size() + testutil.Ok(t, err) + testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize) + expSize = blockSize + walSize + cdmSize actSize, err = fileutil.DirSize(db.Dir()) testutil.Ok(t, err) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") @@ -1285,8 +1302,11 @@ func TestSizeRetention(t *testing.T) { blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) walSize, err = db.Head().wal.Size() testutil.Ok(t, err) + cdmSize, err = db.Head().chunkDiskMapper.Size() + testutil.Ok(t, err) + testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize) // Expected size should take into account block size + WAL size - expSize = blockSize + walSize + expSize = blockSize + walSize + cdmSize actRetentionCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) actSize, err = fileutil.DirSize(db.Dir()) testutil.Ok(t, err) From 601a3ef0bba6ef1fc08ac11ae4ace9cc9f42e8fd Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 21 Oct 2020 18:27:13 +0530 Subject: [PATCH 2/4] Read repair empty last file in chunks_head (#8061) * Read repair empty file in chunks_head Signed-off-by: Ganesh Vernekar * Refactor and introduce repairLastChunkFile Signed-off-by: Ganesh Vernekar * Attempt windows test fix Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/chunks/head_chunks.go | 36 +++++++++++++++++ tsdb/chunks/head_chunks_test.go | 70 +++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 08c6f1b3c..fbccd28f3 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -163,6 +163,11 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { return err } + files, err = repairLastChunkFile(files) + if err != nil { + return err + } + chkFileIndices := make([]int, 0, len(files)) for seq, fn := range files { f, err := fileutil.OpenMmapFile(fn) @@ -218,9 +223,40 @@ func listChunkFiles(dir string) (map[int]string, error) { } res[int(seq)] = filepath.Join(dir, fi.Name()) } + return res, nil } +// repairLastChunkFile deletes the last file if it's empty. +// Because we don't fsync when creating these file, we could end +// up with an empty file at the end during an abrupt shutdown. +func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) { + lastFile := -1 + for seq := range files { + if seq > lastFile { + lastFile = seq + } + } + + if lastFile <= 0 { + return files, nil + } + + info, err := os.Stat(files[lastFile]) + if err != nil { + return files, errors.Wrap(err, "file stat during last head chunk file repair") + } + if info.Size() == 0 { + // Corrupt file, hence remove it. + if err := os.RemoveAll(files[lastFile]); err != nil { + return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair") + } + delete(files, lastFile) + } + + return files, nil +} + // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) { diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index bfb4262ad..f8ed26cf1 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "strconv" "testing" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -363,6 +364,75 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { testutil.Ok(t, hrw.Truncate(2000)) } +func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { + hrw := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + testutil.Ok(t, err) + timeRange += step + } + nonEmptyFile := func() { + testutil.Ok(t, hrw.CutNewFile()) + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + lastFile := 0 + for idx := range hrw.mmappedChunkFiles { + if idx > lastFile { + lastFile = idx + } + } + testutil.Equals(t, 3, lastFile) + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + + // Write an empty last file mimicking an abrupt shutdown on file creation. + emptyFileName := segmentFile(dir, lastFile+1) + f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666) + testutil.Ok(t, err) + testutil.Ok(t, f.Sync()) + stat, err := f.Stat() + testutil.Ok(t, err) + testutil.Equals(t, int64(0), stat.Size()) + testutil.Ok(t, f.Close()) + + // Open chunk disk mapper again, corrupt file should be removed. + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") + + // Removed from memory. + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + for idx := range hrw.mmappedChunkFiles { + testutil.Assert(t, idx <= lastFile, "file index is bigger than previous last file") + } + + // Removed even from disk. + files, err := ioutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(files)) + for _, fi := range files { + seq, err := strconv.ParseUint(fi.Name(), 10, 64) + testutil.Ok(t, err) + testutil.Assert(t, seq <= uint64(lastFile), "file index on disk is bigger than previous last file") + } + +} + func testHeadReadWriter(t *testing.T) *ChunkDiskMapper { tmpdir, err := ioutil.TempDir("", "data") testutil.Ok(t, err) From e7c6feabe1bb94576015f6c62b67f868ba1399ad Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Mon, 26 Oct 2020 14:46:20 +0000 Subject: [PATCH 3/4] More granular locking for scrapeLoop. (#8104) Don't lock for all of Sync/stop/reload as that holds up /metrics and the UI when they want a list of active/dropped targets. Instead take advantage of the fact that Sync/stop/reload cannot be called concurrently by the scrape Manager and lock just on the targets themselves. Signed-off-by: Brian Brazil --- scrape/scrape.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index d52b0ac7f..f6aa49810 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -192,7 +192,13 @@ type scrapePool struct { appendable storage.Appendable logger log.Logger - mtx sync.Mutex + // targetMtx protects activeTargets and droppedTargets from concurrent reads + // and writes. Only one of Sync/stop/reload may be called at once due to + // manager.mtxScrape so we only need to protect from concurrent reads from + // the ActiveTargets and DroppedTargets methods. This allows those two + // methods to always complete without having to wait on scrape loops to gracefull stop. + targetMtx sync.Mutex + config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same @@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed } func (sp *scrapePool) ActiveTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() var tActive []*Target for _, t := range sp.activeTargets { @@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target { } func (sp *scrapePool) DroppedTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() return sp.droppedTargets } @@ -294,8 +300,7 @@ func (sp *scrapePool) stop() { sp.cancel() var wg sync.WaitGroup - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() for fp, l := range sp.loops { wg.Add(1) @@ -308,6 +313,9 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.activeTargets, fp) } + + sp.targetMtx.Unlock() + wg.Wait() sp.client.CloseIdleConnections() @@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { targetScrapePoolReloads.Inc() start := time.Now() - sp.mtx.Lock() - defer sp.mtx.Unlock() - client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false) if err != nil { targetScrapePoolReloadsFailed.Inc() @@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() + forcedErr := sp.refreshTargetLimitErr() for fp, oldLoop := range sp.loops { var cache *scrapeCache @@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { sp.loops[fp] = newLoop } + sp.targetMtx.Unlock() + wg.Wait() oldClient.CloseIdleConnections() targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( @@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { // Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - start := time.Now() + sp.targetMtx.Lock() var all []*Target sp.droppedTargets = []*Target{} for _, tg := range tgs { @@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { } } } + sp.targetMtx.Unlock() sp.sync(all) targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( @@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - // This function expects that you have acquired the sp.mtx lock. var ( uniqueLoops = make(map[uint64]loop) interval = time.Duration(sp.config.ScrapeInterval) @@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() for _, t := range targets { hash := t.hash() @@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) { } } + sp.targetMtx.Unlock() + targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) forcedErr := sp.refreshTargetLimitErr() for _, l := range sp.loops { @@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) { // refreshTargetLimitErr returns an error that can be passed to the scrape loops // if the number of targets exceeds the configured limit. func (sp *scrapePool) refreshTargetLimitErr() error { - // This function expects that you have acquired the sp.mtx lock. if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit { return nil } From 8e0e326f37f1453558b1c88ac9ee0cb8ced29f98 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 3 Nov 2020 09:47:08 +0100 Subject: [PATCH 4/4] *: Cut v2.22.1 Signed-off-by: Frederic Branczyk --- CHANGELOG.md | 6 ++++++ VERSION | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2451fb248..94f61d38c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.22.1 / 2020-11-03 + +* [BUGFIX] Fix potential "mmap: invalid argument" errors in loading the head chunks, after an unclean shutdown, by performing read repairs. #8061 +* [BUGFIX] Fix serving metrics and API when reloading scrape config. #8104 +* [BUGFIX] Fix head chunk size calculation for size based retention. #8139 + ## 2.22.0 / 2020-10-07 As announced in the 2.21.0 release notes, the experimental gRPC API v2 has been diff --git a/VERSION b/VERSION index f48f82fa2..d93847fab 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.22.0 +2.22.1