Browse Source

Merge pull request #2400 from prometheus/beorn7/storage2

storage: Fix checkpointing of fully persisted memory series.
pull/2403/head
Björn Rabenstein 8 years ago committed by GitHub
parent
commit
3e133a9312
  1. 4
      storage/local/heads.go
  2. 67
      storage/local/persistence.go
  3. 31
      storage/local/persistence_test.go
  4. 51
      storage/local/series.go
  5. 7
      storage/local/storage.go
  6. 7
      storage/local/storage_test.go

4
storage/local/heads.go

@ -188,7 +188,9 @@ func (hs *headsScanner) scan() bool {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
hs.chunksToPersistTotal++
cd.MaybePopulateLastTime()
if hs.err = cd.MaybePopulateLastTime(); hs.err != nil {
return false
}
}
chunkDescs[i] = cd
}

67
storage/local/persistence.go

@ -670,12 +670,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
defer fpLocker.Unlock(m.fp)
chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 {
// This series was completely purged or archived in the meantime or has
// no chunks that need persisting. Ignore.
if len(m.series.chunkDescs) == 0 {
// This series was completely purged or archived
// in the meantime. Ignore.
return
}
realNumberOfSeries++
// Sanity checks.
if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
panic("encountered unknown chunk desc offset in combination with positive persist watermark")
}
// These are the values to save in the normal case.
var (
// persistWatermark is zero as we only checkpoint non-persisted chunks.
persistWatermark int64
// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark)
numChunkDescs = int64(chunksToPersist)
)
// However, in the special case of a series being fully
// persisted but still in memory (i.e. not archived), we
// need to save a "placeholder", for which we use just
// the chunk desc of the last chunk. Values have to be
// adjusted accordingly. (The reason for doing it in
// this weird way is to keep the checkpoint format
// compatible with older versions.)
if chunksToPersist == 0 {
persistWatermark = 1
chunkDescsOffset-- // Save one chunk desc after all.
numChunkDescs = 1
}
// seriesFlags left empty in v2.
if err = w.WriteByte(0); err != nil {
return
@ -691,9 +718,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = w.Write(buf); err != nil {
return
}
// persistWatermark. We only checkpoint chunks that need persisting, so
// this is always 0.
if _, err = codable.EncodeVarint(w, int64(0)); err != nil {
if _, err = codable.EncodeVarint(w, persistWatermark); err != nil {
return
}
if m.series.modTime.IsZero() {
@ -705,25 +730,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
}
// chunkDescsOffset.
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil {
if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
return
}
// Number of chunkDescs.
if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil {
if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil {
return
}
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
if chunksToPersist == 0 {
// Save the one placeholder chunk desc for a fully persisted series.
chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1]
if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
return
}
if err = chunkDesc.C.Marshal(w); err != nil {
lt, err := chunkDesc.LastTime()
if err != nil {
return
}
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return
}
} else {
// Save (only) the non-persisted chunks.
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
return
}
if err = chunkDesc.C.Marshal(w); err != nil {
return
}
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
}
}
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series

31
storage/local/persistence_test.go

@ -484,7 +484,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
// Create another chunk in s3.
s3.add(model.SamplePair{Timestamp: 3, Value: 1.4})
s3.headChunkClosed = true
s3.persistWatermark = 2
for i := 0; i < 10000; i++ {
s4.add(model.SamplePair{
Timestamp: model.Time(i),
@ -512,8 +515,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
if err != nil {
t.Fatal(err)
}
if loadedSM.length() != 3 {
t.Errorf("want 3 series in map, got %d", loadedSM.length())
if loadedSM.length() != 4 {
t.Errorf("want 4 series in map, got %d", loadedSM.length())
}
if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS1.metric, m1) {
@ -537,6 +540,28 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
} else {
t.Errorf("couldn't find %v in loaded map", m1)
}
if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
}
if loadedS3.head().C != nil {
t.Error("head chunk not evicted")
}
if loadedS3.chunkDescsOffset != 1 {
t.Errorf("want chunkDescsOffset 1, got %d", loadedS3.chunkDescsOffset)
}
if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false")
}
if loadedS3.head().ChunkFirstTime != 3 {
t.Errorf("want ChunkFirstTime in head chunk to be 3, got %d", loadedS3.head().ChunkFirstTime)
}
if loadedS3.head().ChunkLastTime != 3 {
t.Errorf("want ChunkLastTime in head chunk to be 3, got %d", loadedS3.head().ChunkLastTime)
}
} else {
t.Errorf("couldn't find %v in loaded map", m3)
}
if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS4.metric, m4) {
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)

51
storage/local/series.go

@ -247,7 +247,9 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
// Populate lastTime of now-closed chunks.
for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] {
cd.MaybePopulateLastTime()
if err := cd.MaybePopulateLastTime(); err != nil {
return 0, err
}
}
s.lastTime = v.Timestamp
@ -261,39 +263,40 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
// If the head chunk is already closed, the method is a no-op and returns false.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) maybeCloseHeadChunk() bool {
func (s *memorySeries) maybeCloseHeadChunk() (bool, error) {
if s.headChunkClosed {
return false
return false, nil
}
if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout {
s.headChunkClosed = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false
s.head().MaybePopulateLastTime()
return true
return true, s.head().MaybePopulateLastTime()
}
return false
return false, nil
}
// evictChunkDescs evicts chunkDescs if the chunk is evicted.
// iOldestNotEvicted is the index within the current chunkDescs of the oldest
// chunk that is not evicted.
func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
lenToKeep := len(s.chunkDescs) - iOldestNotEvicted
if lenToKeep < len(s.chunkDescs) {
s.savedFirstTime = s.firstTime()
lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted
chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted))
chunk.NumMemDescs.Sub(float64(lenEvicted))
s.chunkDescs = append(
make([]*chunk.Desc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]...,
)
s.dirty = true
// evictChunkDescs evicts chunkDescs. lenToEvict is the index within the current
// chunkDescs of the oldest chunk that is not evicted.
func (s *memorySeries) evictChunkDescs(lenToEvict int) {
if lenToEvict < 1 {
return
}
if s.chunkDescsOffset < 0 {
panic("chunk desc eviction requested with unknown chunk desc offset")
}
lenToKeep := len(s.chunkDescs) - lenToEvict
s.savedFirstTime = s.firstTime()
s.chunkDescsOffset += lenToEvict
s.persistWatermark -= lenToEvict
chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenToEvict))
chunk.NumMemDescs.Sub(float64(lenToEvict))
s.chunkDescs = append(
make([]*chunk.Desc, 0, lenToKeep),
s.chunkDescs[lenToEvict:]...,
)
s.dirty = true
}
// dropChunks removes chunkDescs older than t. The caller must have locked the
@ -461,9 +464,9 @@ func (s *memorySeries) preloadChunksForRange(
fp, s.chunkDescsOffset, len(cds),
)
}
s.persistWatermark += len(cds)
s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0
s.persistWatermark += len(cds)
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].FirstTime()
}

7
storage/local/storage.go

@ -1376,7 +1376,12 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if series.maybeCloseHeadChunk() {
closed, err := series.maybeCloseHeadChunk()
if err != nil {
s.quarantineSeries(fp, series.metric, err)
s.persistErrors.Inc()
}
if closed {
s.incNumChunksToPersist(1)
}

7
storage/local/storage_test.go

@ -840,10 +840,15 @@ func TestLoop(t *testing.T) {
storage.Append(s)
}
storage.WaitForIndexing()
series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint())
fp := model.Metric{}.FastFingerprint()
series, _ := storage.fpToSeries.get(fp)
storage.fpLocker.Lock(fp)
cdsBefore := len(series.chunkDescs)
storage.fpLocker.Unlock(fp)
time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
storage.fpLocker.Lock(fp)
cdsAfter := len(series.chunkDescs)
storage.fpLocker.Unlock(fp)
storage.Stop()
if cdsBefore <= cdsAfter {
t.Errorf(

Loading…
Cancel
Save