Browse Source

storage: Do not throw away fully persisted memory series in checkpointing

pull/2400/head
beorn7 8 years ago
parent
commit
2363a90adc
  1. 70
      storage/local/persistence.go
  2. 31
      storage/local/persistence_test.go

70
storage/local/persistence.go

@ -670,12 +670,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
defer fpLocker.Unlock(m.fp)
chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 {
// This series was completely purged or archived in the meantime or has
// no chunks that need persisting. Ignore.
if len(m.series.chunkDescs) == 0 {
// This series was completely purged or archived
// in the meantime. Ignore.
return
}
realNumberOfSeries++
// Sanity checks.
if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
panic("encountered unknown chunk desc offset in combination with positive persist watermark")
}
// These are the values to save in the normal case.
var (
// persistWatermark is zero as we only checkpoint non-persisted chunks.
persistWatermark int64
// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark)
numChunkDescs = int64(chunksToPersist)
)
// However, in the special case of a series being fully
// persisted but still in memory (i.e. not archived), we
// need to save a "placeholder", for which we use just
// the chunk desc of the last chunk. Values have to be
// adjusted accordingly. (The reason for doing it in
// this weird way is to keep the checkpoint format
// compatible with older versions.)
if chunksToPersist == 0 {
persistWatermark = 1
chunkDescsOffset-- // Save one chunk desc after all.
numChunkDescs = 1
}
// seriesFlags left empty in v2.
if err = w.WriteByte(0); err != nil {
return
@ -691,9 +718,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = w.Write(buf); err != nil {
return
}
// persistWatermark. We only checkpoint chunks that need persisting, so
// this is always 0.
if _, err = codable.EncodeVarint(w, 0); err != nil {
if _, err = codable.EncodeVarint(w, persistWatermark); err != nil {
return
}
if m.series.modTime.IsZero() {
@ -705,28 +730,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
}
// chunkDescsOffset.
if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
panic("encountered unknown chunk desc offset in combination with positive persist watermark")
}
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil {
if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
return
}
// Number of chunkDescs.
if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil {
if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil {
return
}
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
if chunksToPersist == 0 {
// Save the one placeholder chunk desc for a fully persisted series.
chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1]
if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
return
}
lt, err := chunkDesc.LastTime()
if err != nil {
return
}
if err = chunkDesc.C.Marshal(w); err != nil {
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return
}
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
} else {
// Save (only) the non-persisted chunks.
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
return
}
if err = chunkDesc.C.Marshal(w); err != nil {
return
}
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
}
}
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series

31
storage/local/persistence_test.go

@ -484,7 +484,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
// Create another chunk in s3.
s3.add(model.SamplePair{Timestamp: 3, Value: 1.4})
s3.headChunkClosed = true
s3.persistWatermark = 2
for i := 0; i < 10000; i++ {
s4.add(model.SamplePair{
Timestamp: model.Time(i),
@ -512,8 +515,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
if err != nil {
t.Fatal(err)
}
if loadedSM.length() != 3 {
t.Errorf("want 3 series in map, got %d", loadedSM.length())
if loadedSM.length() != 4 {
t.Errorf("want 4 series in map, got %d", loadedSM.length())
}
if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS1.metric, m1) {
@ -537,6 +540,28 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
} else {
t.Errorf("couldn't find %v in loaded map", m1)
}
if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
}
if loadedS3.head().C != nil {
t.Error("head chunk not evicted")
}
if loadedS3.chunkDescsOffset != 1 {
t.Errorf("want chunkDescsOffset 1, got %d", loadedS3.chunkDescsOffset)
}
if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false")
}
if loadedS3.head().ChunkFirstTime != 3 {
t.Errorf("want ChunkFirstTime in head chunk to be 3, got %d", loadedS3.head().ChunkFirstTime)
}
if loadedS3.head().ChunkLastTime != 3 {
t.Errorf("want ChunkLastTime in head chunk to be 3, got %d", loadedS3.head().ChunkLastTime)
}
} else {
t.Errorf("couldn't find %v in loaded map", m3)
}
if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS4.metric, m4) {
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)

Loading…
Cancel
Save