Merge pull request #1304 from prometheus/beorn7/storage

Improve handling of series file truncation
pull/1306/head
Björn Rabenstein 2016-01-11 17:27:08 +01:00
commit 6293f3a374
5 changed files with 128 additions and 14 deletions

View File

@ -128,6 +128,10 @@ func init() {
&cfg.storage.SyncStrategy, "storage.local.series-sync-strategy",
"When to sync series files after modification. Possible values: 'never', 'always', 'adaptive'. Sync'ing slows down storage performance but reduces the risk of data loss in case of an OS crash. With the 'adaptive' strategy, series files are sync'd for as long as the storage is not too much behind on chunk persistence.",
)
cfg.fs.Float64Var(
&cfg.storage.MinShrinkRatio, "storage.local.series-file-shrink-ratio", 0.1,
"A series file is only truncated (to delete samples that have exceeded the retention period) if it shrinks by at least the provided ratio. This saves I/O operations while causing only a limited storage space overhead. If 0 or smaller, truncation will be performed even for a single dropped chunk, while 1 or larger will effectively prevent any truncation.",
)
cfg.fs.BoolVar(
&cfg.storage.Dirty, "storage.local.dirty", false,
"If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.",

View File

@ -129,11 +129,18 @@ type persistence struct {
shouldSync syncStrategy
minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.
bufPool sync.Pool
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync syncStrategy) (*persistence, error) {
func newPersistence(
basePath string,
dirty, pedanticChecks bool,
shouldSync syncStrategy,
minShrinkRatio float64,
) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName)
@ -938,13 +945,14 @@ func (p *persistence) dropAndPersistChunks(
}
defer f.Close()
headerBuf := make([]byte, chunkHeaderLen)
var firstTimeInFile model.Time
// Find the first chunk in the file that should be kept.
for ; ; numDropped++ {
_, err = f.Seek(offsetForChunkIndex(numDropped), os.SEEK_SET)
if err != nil {
return
}
headerBuf := make([]byte, chunkHeaderLen)
_, err = io.ReadFull(f, headerBuf)
if err == io.EOF {
// We ran into the end of the file without finding any chunks that should
@ -962,29 +970,44 @@ func (p *persistence) dropAndPersistChunks(
if err != nil {
return
}
if numDropped == 0 {
firstTimeInFile = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
}
lastTime := model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
)
if !lastTime.Before(beforeTime) {
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
break
}
}
// We've found the first chunk that should be kept. If it is the first
// one, just append the chunks.
if numDropped == 0 {
// We've found the first chunk that should be kept.
// First check if the shrink ratio is good enough to perform the the
// actual drop or leave it for next time if it is not worth the effort.
fi, err := f.Stat()
if err != nil {
return
}
totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
// Nothing to drop. Just adjust the return values and append the chunks (if any).
numDropped = 0
firstTimeNotDropped = firstTimeInFile
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
}
return
}
// Otherwise, seek backwards to the beginning of its header and start
// copying everything from there into a new file. Then append the chunks
// to the new file.
// If we are here, we have to drop some chunks for real. So we need to
// record firstTimeNotDropped from the last read header, seek backwards
// to the beginning of its header, and start copying everything from
// there into a new file. Then append the chunks to the new file.
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil {
return

View File

@ -37,7 +37,7 @@ var (
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) {
DefaultChunkEncoding = encoding
dir := testutil.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
if err != nil {
dir.Close()
t.Fatal(err)
@ -338,6 +338,85 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Error("not all chunks dropped")
}
}
// Now set minShrinkRatio to 0.25 and play with it.
p.minShrinkRatio = 0.25
// Re-add 8 chunks.
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, model.Earliest, chunks[:8])
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, model.Time(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
// Drop only the first chunk should not happen, but persistence should still work.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[8:9])
if err != nil {
t.Fatal(err)
}
if offset != 8 {
t.Errorf("want offset 8, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 0 {
t.Errorf("want 0 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
// Drop only the first two chunks should not happen, either.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 0 {
t.Errorf("want 0 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
// Drop the first three chunks should finally work.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 3, chunks[9:])
if err != nil {
t.Fatal(err)
}
if offset != 6 {
t.Errorf("want offset 6, got %d", offset)
}
if firstTime != 3 {
t.Errorf("want first time 3, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
}
func TestPersistLoadDropChunksType0(t *testing.T) {

View File

@ -152,6 +152,7 @@ type MemorySeriesStorageOptions struct {
Dirty bool // Force the storage to consider itself dirty on startup.
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
MinShrinkRatio float64 // Minimum ratio a series file has to shrink during truncation.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
@ -243,7 +244,12 @@ func (s *memorySeriesStorage) Start() (err error) {
}
var p *persistence
p, err = newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy)
p, err = newPersistence(
s.options.PersistenceStoragePath,
s.options.Dirty, s.options.PedanticChecks,
syncStrategy,
s.options.MinShrinkRatio,
)
if err != nil {
return err
}

View File

@ -563,6 +563,7 @@ func TestLoop(t *testing.T) {
PersistenceStoragePath: directory.Path(),
CheckpointInterval: 250 * time.Millisecond,
SyncStrategy: Adaptive,
MinShrinkRatio: 0.1,
}
storage := NewMemorySeriesStorage(o)
if err := storage.Start(); err != nil {
@ -1320,6 +1321,7 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Second,
SyncStrategy: Adaptive,
MinShrinkRatio: 0.1,
}
s := NewMemorySeriesStorage(o)
if err := s.Start(); err != nil {