diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 393aebbef..689900f31 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -15,6 +15,7 @@ package local import ( "bufio" + "context" "encoding/binary" "fmt" "io" @@ -626,7 +627,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ // NOTE: Above, varint encoding is used consistently although uvarint would have // made more sense in many cases. This was simply a glitch while designing the // format. -func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { +func (p *persistence) checkpointSeriesMapAndHeads( + ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker, +) (err error) { log.Info("Checkpointing in-memory metrics and chunks...") p.checkpointing.Set(1) defer p.checkpointing.Set(0) @@ -637,11 +640,16 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } defer func() { - syncErr := f.Sync() - closeErr := f.Close() + defer os.Remove(p.headsTempFileName()) // Just in case it was left behind. + if err != nil { + // If we already had an error, do not bother to sync, + // just close, ignoring any further error. + f.Close() return } + syncErr := f.Sync() + closeErr := f.Close() err = syncErr if err != nil { return @@ -683,6 +691,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap var realNumberOfSeries uint64 for m := range iter { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } func() { // Wrapped in function to use defer for unlocking the fp. fpLocker.Lock(m.fp) defer fpLocker.Unlock(m.fp) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 33a8195a3..a468d7261 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -15,6 +15,7 @@ package local import ( "bufio" + "context" "errors" "os" "path/filepath" @@ -547,6 +548,27 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } +func TestCancelCheckpoint(t *testing.T) { + p, closer := newTestPersistence(t, 2) + defer closer.Close() + + fpLocker := newFingerprintLocker(10) + sm := newSeriesMap() + s, _ := newMemorySeries(m1, nil, time.Time{}) + sm.put(m1.FastFingerprint(), s) + sm.put(m2.FastFingerprint(), s) + sm.put(m3.FastFingerprint(), s) + sm.put(m4.FastFingerprint(), s) + sm.put(m5.FastFingerprint(), s) + + ctx, cancel := context.WithCancel(context.Background()) + // Cancel right now to avoid races. + cancel() + if err := p.checkpointSeriesMapAndHeads(ctx, sm, fpLocker); err != context.Canceled { + t.Fatalf("expected error %v, got %v", context.Canceled, err) + } +} + func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -584,7 +606,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin sm.put(m4.FastFingerprint(), s4) sm.put(m5.FastFingerprint(), s5) - if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil { + if err := p.checkpointSeriesMapAndHeads(context.Background(), sm, fpLocker); err != nil { t.Fatal(err) } diff --git a/storage/local/storage.go b/storage/local/storage.go index d96185c08..6e799f010 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -462,7 +462,9 @@ func (s *MemorySeriesStorage) Stop() error { <-s.evictStopped // One final checkpoint of the series map and the head chunks. - if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { + if err := s.persistence.checkpointSeriesMapAndHeads( + context.Background(), s.fpToSeries, s.fpLocker, + ); err != nil { return err } if err := s.mapper.checkpoint(); err != nil { @@ -1421,11 +1423,13 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing func (s *MemorySeriesStorage) loop() { checkpointTimer := time.NewTimer(s.checkpointInterval) + checkpointMinTimer := time.NewTimer(0) var dirtySeriesCount int64 defer func() { checkpointTimer.Stop() + checkpointMinTimer.Stop() log.Info("Maintenance loop stopped.") close(s.loopStopped) }() @@ -1433,32 +1437,57 @@ func (s *MemorySeriesStorage) loop() { memoryFingerprints := s.cycleThroughMemoryFingerprints() archivedFingerprints := s.cycleThroughArchivedFingerprints() + checkpointCtx, checkpointCancel := context.WithCancel(context.Background()) + checkpointNow := make(chan struct{}, 1) + + doCheckpoint := func() time.Duration { + start := time.Now() + // We clear this before the checkpoint so that dirtySeriesCount + // is an upper bound. + atomic.StoreInt64(&dirtySeriesCount, 0) + s.dirtySeries.Set(0) + select { + case <-checkpointNow: + // Signal cleared. + default: + // No signal pending. + } + err := s.persistence.checkpointSeriesMapAndHeads( + checkpointCtx, s.fpToSeries, s.fpLocker, + ) + if err == context.Canceled { + log.Info("Checkpoint canceled.") + } else if err != nil { + s.persistErrors.Inc() + log.Errorln("Error while checkpointing:", err) + } + return time.Since(start) + } + // Checkpoints can happen concurrently with maintenance so even with heavy // checkpointing there will still be sufficient progress on maintenance. checkpointLoopStopped := make(chan struct{}) go func() { for { select { - case <-s.loopStopping: + case <-checkpointCtx.Done(): checkpointLoopStopped <- struct{}{} return - case <-checkpointTimer.C: - // We clear this before the checkpoint so that dirtySeriesCount - // is an upper bound. - atomic.StoreInt64(&dirtySeriesCount, 0) - s.dirtySeries.Set(0) - err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) - if err != nil { - s.persistErrors.Inc() - log.Errorln("Error while checkpointing:", err) - } - // If a checkpoint takes longer than checkpointInterval, unluckily timed - // combination with the Reset(0) call below can lead to a case where a - // time is lurking in C leading to repeated checkpointing without break. + case <-checkpointMinTimer.C: + var took time.Duration select { - case <-checkpointTimer.C: // Get rid of the lurking time. - default: + case <-checkpointCtx.Done(): + checkpointLoopStopped <- struct{}{} + return + case <-checkpointTimer.C: + took = doCheckpoint() + case <-checkpointNow: + if !checkpointTimer.Stop() { + <-checkpointTimer.C + } + took = doCheckpoint() } + checkpointMinTimer.Reset(took) checkpointTimer.Reset(s.checkpointInterval) } } @@ -1468,6 +1497,7 @@ loop: for { select { case <-s.loopStopping: + checkpointCancel() break loop case fp := <-memoryFingerprints: if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { @@ -1478,10 +1508,15 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the urgency score is < 1. + // quickly as possible. So only checkpoint if we are not in rushed mode. if _, rushed := s.getPersistenceUrgencyScore(); !rushed && dirty >= int64(s.checkpointDirtySeriesLimit) { - checkpointTimer.Reset(0) + select { + case checkpointNow <- struct{}{}: + // Signal sent. + default: + // Signal already pending. + } } } case fp := <-archivedFingerprints: