Browse Source

Merge branch 'beorn7/storage' into beorn7/storage3

pull/2414/head
beorn7 8 years ago
parent
commit
73bd5e4dff
  1. 19
      storage/local/persistence.go
  2. 14
      storage/local/persistence_test.go

19
storage/local/persistence.go

@ -846,12 +846,14 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
// of the first sample in the oldest chunk _not_ dropped, the offset within the
// series file of the first chunk persisted (out of the provided chunks), the
// number of deleted chunks, and true if all chunks of the series have been
// deleted (in which case the returned timestamp will be 0 and must be ignored).
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
// of the first sample in the oldest chunk _not_ dropped, the chunk offset
// within the series file of the first chunk persisted (out of the provided
// chunks, or - if no chunks were provided - the chunk offset where chunks would
// have been persisted, i.e. the end of the file), the number of deleted chunks,
// and true if all chunks of the series have been deleted (in which case the
// returned timestamp will be 0 and must be ignored). It is the caller's
// responsibility to make sure nothing is persisted or loaded for the same
// fingerprint concurrently.
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
@ -923,7 +925,8 @@ func (p *persistence) dropAndPersistChunks(
if err != nil {
return
}
totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
chunksInFile := int(fi.Size()) / chunkLenWithHeader
totalChunks := chunksInFile + len(chunks)
// Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading.
chunkIndexToStartSeek := 0
@ -986,6 +989,8 @@ func (p *persistence) dropAndPersistChunks(
)
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
} else {
offset = chunksInFile
}
return
}

14
storage/local/persistence_test.go

@ -173,14 +173,16 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
}
}
// Try to drop one chunk, which must be prevented by the shrink ratio.
// Try to drop one chunk, which must be prevented by the shrink
// ratio. Since we do not pass in any chunks to persist, the offset
// should be the number of chunks in the file.
for fp, _ := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
if offset != 10 {
t.Errorf("want offset 10, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
@ -422,14 +424,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
t.Error("all chunks dropped")
}
}
// Drop only the first two chunks should not happen, either.
// Drop only the first two chunks should not happen, either. Chunks in file is now 9.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
if offset != 9 {
t.Errorf("want offset 9, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)

Loading…
Cancel
Save