storage: Several optimizations of checkpointing

- checkpointSeriesMapAndHeads accepts a context now to allow
  cancelling.

- If a shutdown is initiated, cancel the ongoing checkpoint. (We will
  create a final checkpoint anyway.)

- Always wait for at least as long as the last checkpoint took before
  starting the next checkpoint (to cap the time spending checkpointing
  at 50%).

- If an error has occurred during checkpointing, don't bother to sync
  the write.

- Make sure the temporary checkpoint file is deleted, even if an error
  has occurred.

- Clean up the checkpoint loop a bit. (The concurrent Timer.Reset(0)
  call might have cause a race.)
pull/2591/head
beorn7 8 years ago
parent 934d86b936
commit f338d791d2

@ -15,6 +15,7 @@ package local
import ( import (
"bufio" "bufio"
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
@ -626,7 +627,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
// NOTE: Above, varint encoding is used consistently although uvarint would have // NOTE: Above, varint encoding is used consistently although uvarint would have
// made more sense in many cases. This was simply a glitch while designing the // made more sense in many cases. This was simply a glitch while designing the
// format. // format.
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { func (p *persistence) checkpointSeriesMapAndHeads(
ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker,
) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...") log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1) p.checkpointing.Set(1)
defer p.checkpointing.Set(0) defer p.checkpointing.Set(0)
@ -637,11 +640,16 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
defer func() { defer func() {
syncErr := f.Sync() defer os.Remove(p.headsTempFileName()) // Just in case it was left behind.
closeErr := f.Close()
if err != nil { if err != nil {
// If we already had an error, do not bother to sync,
// just close, ignoring any further error.
f.Close()
return return
} }
syncErr := f.Sync()
closeErr := f.Close()
err = syncErr err = syncErr
if err != nil { if err != nil {
return return
@ -683,6 +691,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
var realNumberOfSeries uint64 var realNumberOfSeries uint64
for m := range iter { for m := range iter {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
func() { // Wrapped in function to use defer for unlocking the fp. func() { // Wrapped in function to use defer for unlocking the fp.
fpLocker.Lock(m.fp) fpLocker.Lock(m.fp)
defer fpLocker.Unlock(m.fp) defer fpLocker.Unlock(m.fp)

@ -15,6 +15,7 @@ package local
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"os" "os"
"path/filepath" "path/filepath"
@ -547,6 +548,27 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1) testPersistLoadDropChunks(t, 1)
} }
func TestCancelCheckpoint(t *testing.T) {
p, closer := newTestPersistence(t, 2)
defer closer.Close()
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s, _ := newMemorySeries(m1, nil, time.Time{})
sm.put(m1.FastFingerprint(), s)
sm.put(m2.FastFingerprint(), s)
sm.put(m3.FastFingerprint(), s)
sm.put(m4.FastFingerprint(), s)
sm.put(m5.FastFingerprint(), s)
ctx, cancel := context.WithCancel(context.Background())
// Cancel right now to avoid races.
cancel()
if err := p.checkpointSeriesMapAndHeads(ctx, sm, fpLocker); err != context.Canceled {
t.Fatalf("expected error %v, got %v", context.Canceled, err)
}
}
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) { func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
@ -584,7 +606,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
sm.put(m4.FastFingerprint(), s4) sm.put(m4.FastFingerprint(), s4)
sm.put(m5.FastFingerprint(), s5) sm.put(m5.FastFingerprint(), s5)
if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil { if err := p.checkpointSeriesMapAndHeads(context.Background(), sm, fpLocker); err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -462,7 +462,9 @@ func (s *MemorySeriesStorage) Stop() error {
<-s.evictStopped <-s.evictStopped
// One final checkpoint of the series map and the head chunks. // One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { if err := s.persistence.checkpointSeriesMapAndHeads(
context.Background(), s.fpToSeries, s.fpLocker,
); err != nil {
return err return err
} }
if err := s.mapper.checkpoint(); err != nil { if err := s.mapper.checkpoint(); err != nil {
@ -1421,11 +1423,13 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing
func (s *MemorySeriesStorage) loop() { func (s *MemorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval) checkpointTimer := time.NewTimer(s.checkpointInterval)
checkpointMinTimer := time.NewTimer(0)
var dirtySeriesCount int64 var dirtySeriesCount int64
defer func() { defer func() {
checkpointTimer.Stop() checkpointTimer.Stop()
checkpointMinTimer.Stop()
log.Info("Maintenance loop stopped.") log.Info("Maintenance loop stopped.")
close(s.loopStopped) close(s.loopStopped)
}() }()
@ -1433,32 +1437,57 @@ func (s *MemorySeriesStorage) loop() {
memoryFingerprints := s.cycleThroughMemoryFingerprints() memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints() archivedFingerprints := s.cycleThroughArchivedFingerprints()
checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
checkpointNow := make(chan struct{}, 1)
doCheckpoint := func() time.Duration {
start := time.Now()
// We clear this before the checkpoint so that dirtySeriesCount
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
select {
case <-checkpointNow:
// Signal cleared.
default:
// No signal pending.
}
err := s.persistence.checkpointSeriesMapAndHeads(
checkpointCtx, s.fpToSeries, s.fpLocker,
)
if err == context.Canceled {
log.Info("Checkpoint canceled.")
} else if err != nil {
s.persistErrors.Inc()
log.Errorln("Error while checkpointing:", err)
}
return time.Since(start)
}
// Checkpoints can happen concurrently with maintenance so even with heavy // Checkpoints can happen concurrently with maintenance so even with heavy
// checkpointing there will still be sufficient progress on maintenance. // checkpointing there will still be sufficient progress on maintenance.
checkpointLoopStopped := make(chan struct{}) checkpointLoopStopped := make(chan struct{})
go func() { go func() {
for { for {
select { select {
case <-s.loopStopping: case <-checkpointCtx.Done():
checkpointLoopStopped <- struct{}{} checkpointLoopStopped <- struct{}{}
return return
case <-checkpointTimer.C: case <-checkpointMinTimer.C:
// We clear this before the checkpoint so that dirtySeriesCount var took time.Duration
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
s.persistErrors.Inc()
log.Errorln("Error while checkpointing:", err)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
// time is lurking in C leading to repeated checkpointing without break.
select { select {
case <-checkpointTimer.C: // Get rid of the lurking time. case <-checkpointCtx.Done():
default: checkpointLoopStopped <- struct{}{}
return
case <-checkpointTimer.C:
took = doCheckpoint()
case <-checkpointNow:
if !checkpointTimer.Stop() {
<-checkpointTimer.C
}
took = doCheckpoint()
} }
checkpointMinTimer.Reset(took)
checkpointTimer.Reset(s.checkpointInterval) checkpointTimer.Reset(s.checkpointInterval)
} }
} }
@ -1468,6 +1497,7 @@ loop:
for { for {
select { select {
case <-s.loopStopping: case <-s.loopStopping:
checkpointCancel()
break loop break loop
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
@ -1478,10 +1508,15 @@ loop:
// would be counterproductive, as it would slow down chunk persisting even more, // would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk // while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as // maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the urgency score is < 1. // quickly as possible. So only checkpoint if we are not in rushed mode.
if _, rushed := s.getPersistenceUrgencyScore(); !rushed && if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
dirty >= int64(s.checkpointDirtySeriesLimit) { dirty >= int64(s.checkpointDirtySeriesLimit) {
checkpointTimer.Reset(0) select {
case checkpointNow <- struct{}{}:
// Signal sent.
default:
// Signal already pending.
}
} }
} }
case fp := <-archivedFingerprints: case fp := <-archivedFingerprints:

Loading…
Cancel
Save