Browse Source

do a proper cleanup for a failed reload after a compaction

a failed reload immediately after a compaction should delete the
resulting block to avoid creating blocks with the same time range.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 6 years ago
parent
commit
b521559c3b
  1. 76
      compact_test.go
  2. 11
      db.go

76
compact_test.go

@ -17,12 +17,14 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"path"
"path/filepath" "path/filepath"
"testing" "testing"
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
@ -741,3 +743,77 @@ func TestDisableAutoCompactions(t *testing.T) {
} }
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
} }
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction
// deletes the resulting block to avoid creatings blocks with the same time range.
func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
tests := map[string]func(*DB) int{
"Test Head Compaction": func(db *DB) int {
rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar")
// Add some data to the head that is enough to trigger a compaction.
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
return 1
},
"Test Block Compaction": func(db *DB) int {
expBlocks := []*BlockMeta{
{MinTime: 0, MaxTime: 100},
{MinTime: 100, MaxTime: 150},
{MinTime: 150, MaxTime: 200},
}
for _, m := range expBlocks {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
}
testutil.Ok(t, db.reload())
testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload")
return len(expBlocks) + 1
},
}
for title, bootStrap := range tests {
t.Run(title, func(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{1, 100},
})
defer close()
defer db.Close()
db.DisableCompactions()
expBlocks := bootStrap(db)
// Create a block that will trigger the reloard to fail.
blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300))
lastBlockIndex := path.Join(blockPath, indexFilename)
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks))
testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
// Do the compaction and check the metrics.
// Since the most recent block is not included in the compaction,
// the compaction should succeed, but the reload should fail and
// the new block created from the compaction should be deleted.
db.EnableCompactions()
testutil.NotOk(t, db.compact())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch")
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks))
})
}
}

11
db.go

@ -425,6 +425,9 @@ func (db *DB) compact() (err error) {
runtime.GC() runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
if (uid == ulid.ULID{}) { if (uid == ulid.ULID{}) {
@ -454,12 +457,16 @@ func (db *DB) compact() (err error) {
default: default:
} }
if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
if err != nil {
return errors.Wrapf(err, "compact %s", plan) return errors.Wrapf(err, "compact %s", plan)
} }
runtime.GC() runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
runtime.GC() runtime.GC()
@ -505,7 +512,7 @@ func (db *DB) reload() (err error) {
} }
} }
if len(corrupted) > 0 { if len(corrupted) > 0 {
return errors.Wrap(err, "unexpected corrupted block") return fmt.Errorf("unexpected corrupted block:%v", corrupted)
} }
// All deletable blocks should not be loaded. // All deletable blocks should not be loaded.

Loading…
Cancel
Save