mirror of https://github.com/prometheus/prometheus
storage: Fix chunkIndexToStartSeek calculation
With a high enough shrink ratio and enough chunks to persist, the cutoff point could be _outside_ of the file, which wreaks havoc in the storage.pull/2414/head
parent
73bd5e4dff
commit
d771185a43
|
@ -932,8 +932,9 @@ func (p *persistence) dropAndPersistChunks(
|
|||
chunkIndexToStartSeek := 0
|
||||
if p.minShrinkRatio < 1 {
|
||||
chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio))
|
||||
} else {
|
||||
chunkIndexToStartSeek = totalChunks - 1
|
||||
}
|
||||
if chunkIndexToStartSeek >= chunksInFile {
|
||||
chunkIndexToStartSeek = chunksInFile - 1
|
||||
}
|
||||
numDropped = chunkIndexToStartSeek
|
||||
|
||||
|
|
|
@ -462,6 +462,81 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
|
|||
t.Error("all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Drop all the chunks again.
|
||||
for fp := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 100, nil)
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 0 {
|
||||
t.Errorf("want offset 0, got %d", offset)
|
||||
}
|
||||
if numDropped != 7 {
|
||||
t.Errorf("want 7 dropped chunks, got %v", numDropped)
|
||||
}
|
||||
if !allDropped {
|
||||
t.Error("not all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Re-add first two of the chunks again.
|
||||
for fp, chunks := range fpToChunks {
|
||||
firstTimeNotDropped, offset, numDropped, allDropped, err :=
|
||||
p.dropAndPersistChunks(fp, model.Earliest, chunks[:2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got, want := firstTimeNotDropped, model.Time(0); got != want {
|
||||
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
|
||||
}
|
||||
if got, want := offset, 0; got != want {
|
||||
t.Errorf("Want offset %v, got %v.", got, want)
|
||||
}
|
||||
if got, want := numDropped, 0; got != want {
|
||||
t.Errorf("Want numDropped %v, got %v.", got, want)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("All dropped.")
|
||||
}
|
||||
}
|
||||
// Try to drop the first of the chunks while adding eight more. The drop
|
||||
// should not happen because of the shrink ratio. Also, this time the
|
||||
// minimum cut-off point is within the added chunks and not in the file
|
||||
// anymore.
|
||||
for fp, chunks := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[2:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 2 {
|
||||
t.Errorf("want offset 2, got %d", offset)
|
||||
}
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
}
|
||||
if numDropped != 0 {
|
||||
t.Errorf("want 0 dropped chunk, got %v", numDropped)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("all chunks dropped")
|
||||
}
|
||||
wantChunks := chunks
|
||||
indexes := make([]int, len(wantChunks))
|
||||
for i := range indexes {
|
||||
indexes[i] = i
|
||||
}
|
||||
gotChunks, err := p.loadChunks(fp, indexes, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i, wantChunk := range wantChunks {
|
||||
if !chunksEqual(wantChunk, gotChunks[i]) {
|
||||
t.Errorf("%d. Chunks not equal.", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistLoadDropChunksType0(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue