deleting *.tmp WAL files on startup (#10317)

* fix issue #10245

Signed-off-by: lihaowei <haoweili35@gmail.com>

* minor changes

Signed-off-by: lihaowei <haoweili35@gmail.com>

* review changes

Signed-off-by: lihaowei <haoweili35@gmail.com>

* minor changes

Signed-off-by: lihaowei <haoweili35@gmail.com>
pull/10487/head
Howie 2022-03-24 18:44:14 +08:00 committed by GitHub
parent 20dbb128d9
commit 1291ec7185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 6 deletions

View File

@ -639,9 +639,11 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if err := MigrateWAL(l, walDir); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}
// Remove garbage, tmp blocks.
if err := removeBestEffortTmpDirs(l, dir); err != nil {
return nil, errors.Wrap(err, "remove tmp dirs")
for _, tmpDir := range []string{walDir, dir} {
// Remove tmp dirs.
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
return nil, errors.Wrap(err, "remove tmp dirs")
}
}
db := &DB{
@ -760,11 +762,14 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
func removeBestEffortTmpDirs(l log.Logger, dir string) error {
files, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
for _, fi := range files {
if isTmpBlockDir(fi) {
if isTmpDir(fi) {
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
level.Error(l).Log("msg", "failed to delete tmp block dir", "dir", filepath.Join(dir, fi.Name()), "err", err)
continue
@ -1720,8 +1725,8 @@ func isBlockDir(fi os.FileInfo) bool {
return err == nil
}
// isTmpBlockDir returns dir that consists of block dir ULID and tmp extension.
func isTmpBlockDir(fi os.FileInfo) bool {
// isTmpDir returns true if the given file-info contains a block ULID or checkpoint prefix and a tmp extension.
func isTmpDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
@ -1729,6 +1734,9 @@ func isTmpBlockDir(fi os.FileInfo) bool {
fn := fi.Name()
ext := filepath.Ext(fn)
if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy {
if strings.HasPrefix(fn, "checkpoint.") {
return true
}
if _, err := ulid.ParseStrict(fn[:len(fn)-len(ext)]); err == nil {
return true
}

View File

@ -2974,6 +2974,9 @@ func TestOpen_VariousBlockStates(t *testing.T) {
_, err = writeMetaFile(log.NewLogfmtLogger(os.Stderr), dir, m)
require.NoError(t, err)
}
tmpCheckpointDir := path.Join(tmpDir, "wal/checkpoint.00000001.tmp")
err := os.MkdirAll(tmpCheckpointDir, 0o777)
require.NoError(t, err)
opts := DefaultOptions()
opts.RetentionDuration = 0
@ -3005,6 +3008,8 @@ func TestOpen_VariousBlockStates(t *testing.T) {
}
}
require.Equal(t, len(expectedIgnoredDirs), ignored)
_, err = os.Stat(tmpCheckpointDir)
require.True(t, os.IsNotExist(err))
}
func TestOneCheckpointPerCompactCall(t *testing.T) {