Merge pull request #2413 from prometheus/beorn7/storage

storage: Fix offset returned by dropAndPersistChunks
pull/2415/head
Björn Rabenstein 2017-02-09 15:54:52 +01:00 committed by GitHub
commit 342f970e05
2 changed files with 20 additions and 13 deletions

View File

@ -845,12 +845,14 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
// of the first sample in the oldest chunk _not_ dropped, the offset within the
// series file of the first chunk persisted (out of the provided chunks), the
// number of deleted chunks, and true if all chunks of the series have been
// deleted (in which case the returned timestamp will be 0 and must be ignored).
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
// of the first sample in the oldest chunk _not_ dropped, the chunk offset
// within the series file of the first chunk persisted (out of the provided
// chunks, or - if no chunks were provided - the chunk offset where chunks would
// have been persisted, i.e. the end of the file), the number of deleted chunks,
// and true if all chunks of the series have been deleted (in which case the
// returned timestamp will be 0 and must be ignored). It is the caller's
// responsibility to make sure nothing is persisted or loaded for the same
// fingerprint concurrently.
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
@ -966,13 +968,16 @@ func (p *persistence) dropAndPersistChunks(
if err != nil {
return
}
totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
chunksInFile := int(fi.Size()) / chunkLenWithHeader
totalChunks := chunksInFile + len(chunks)
if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
// Nothing to drop. Just adjust the return values and append the chunks (if any).
numDropped = 0
firstTimeNotDropped = firstTimeInFile
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
} else {
offset = chunksInFile
}
return
}

View File

@ -173,14 +173,16 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
}
}
// Try to drop one chunk, which must be prevented by the shrink ratio.
// Try to drop one chunk, which must be prevented by the shrink
// ratio. Since we do not pass in any chunks to persist, the offset
// should be the number of chunks in the file.
for fp, _ := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
if offset != 10 {
t.Errorf("want offset 10, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
@ -422,14 +424,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
t.Error("all chunks dropped")
}
}
// Drop only the first two chunks should not happen, either.
// Drop only the first two chunks should not happen, either. Chunks in file is now 9.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
if offset != 9 {
t.Errorf("want offset 9, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)