diff --git a/storage/local/persistence.go b/storage/local/persistence.go index ef173f211..4d2733496 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -668,8 +668,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap fpLocker.Lock(m.fp) defer fpLocker.Unlock(m.fp) - if len(m.series.chunkDescs) == 0 { - // This series was completely purged or archived in the meantime. Ignore. + chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark + if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 { + // This series was completely purged or archived in the meantime or has + // no chunks that need persisting. Ignore. return } realNumberOfSeries++ @@ -688,7 +690,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = w.Write(buf); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { + // persistWatermark. We only checkpoint chunks that need persisting, so + // this is always 0. + if _, err = codable.EncodeVarint(w, int64(0)); err != nil { return } if m.series.modTime.IsZero() { @@ -700,37 +704,25 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } - if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { + // chunkDescsOffset. + if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil { return } if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { + // Number of chunkDescs. + if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil { return } - for i, chunkDesc := range m.series.chunkDescs { - if i < m.series.persistWatermark { - if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { - return - } - lt, err := chunkDesc.LastTime() - if err != nil { - return - } - if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { - return - } - } else { - // This is a non-persisted chunk. Fully marshal it. - if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { - return - } - if err = chunkDesc.C.Marshal(w); err != nil { - return - } + for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { + if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + return + } + if err = chunkDesc.C.Marshal(w); err != nil { + return } - p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark)) + p.checkpointChunksWritten.Observe(float64(chunksToPersist)) } // Series is checkpointed now, so declare it clean. In case the entire // checkpoint fails later on, this is fine, as the storage's series diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 2b3785766..27f620366 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -493,8 +493,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if err != nil { t.Fatal(err) } - if loadedSM.length() != 4 { - t.Errorf("want 4 series in map, got %d", loadedSM.length()) + if loadedSM.length() != 3 { + t.Errorf("want 3 series in map, got %d", loadedSM.length()) } if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS1.metric, m1) { @@ -518,28 +518,6 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin } else { t.Errorf("couldn't find %v in loaded map", m1) } - if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok { - if !reflect.DeepEqual(loadedS3.metric, m3) { - t.Errorf("want metric %v, got %v", m3, loadedS3.metric) - } - if loadedS3.head().C != nil { - t.Error("head chunk not evicted") - } - if loadedS3.chunkDescsOffset != 0 { - t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset) - } - if !loadedS3.headChunkClosed { - t.Error("headChunkClosed is false") - } - if loadedS3.head().ChunkFirstTime != 2 { - t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime) - } - if loadedS3.head().ChunkLastTime != 2 { - t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime) - } - } else { - t.Errorf("couldn't find %v in loaded map", m3) - } if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS4.metric, m4) { t.Errorf("want metric %v, got %v", m4, loadedS4.metric) @@ -594,20 +572,17 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if !reflect.DeepEqual(loadedS5.metric, m5) { t.Errorf("want metric %v, got %v", m5, loadedS5.metric) } - if got, want := len(loadedS5.chunkDescs), chunkCountS5; got != want { + if got, want := len(loadedS5.chunkDescs), chunkCountS5-3; got != want { t.Errorf("got %d chunkDescs, want %d", got, want) } - if got, want := loadedS5.persistWatermark, 3; got != want { + if got, want := loadedS5.persistWatermark, 0; got != want { t.Errorf("got persistWatermark %d, want %d", got, want) } - if !loadedS5.chunkDescs[2].IsEvicted() { - t.Error("3rd chunk not evicted") - } - if loadedS5.chunkDescs[3].IsEvicted() { - t.Error("4th chunk evicted") + if loadedS5.chunkDescs[0].IsEvicted() { + t.Error("1st chunk evicted") } - if loadedS5.chunkDescsOffset != 0 { - t.Errorf("want chunkDescsOffset 0, got %d", loadedS5.chunkDescsOffset) + if loadedS5.chunkDescsOffset != 3 { + t.Errorf("want chunkDescsOffset 3, got %d", loadedS5.chunkDescsOffset) } if loadedS5.headChunkClosed { t.Error("headChunkClosed is true")