diff --git a/compact_test.go b/compact_test.go index acbbb5182..fd4cd5b0f 100644 --- a/compact_test.go +++ b/compact_test.go @@ -19,10 +19,13 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -682,3 +685,59 @@ func TestCompaction_populateBlock(t *testing.T) { } } } + +// TestDisableAutoCompactions checks that we can +// disable and enable the auto compaction. +// This is needed for unit tests that rely on +// checking state before and after a compaction. +func TestDisableAutoCompactions(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + blockRange := DefaultOptions.BlockRanges[0] + label := labels.FromStrings("foo", "bar") + + // Trigger a compaction to check that it was skipped and + // no new blocks were created when compaction is disabled. + db.DisableCompactions() + app := db.Appender() + for i := int64(0); i < 3; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + _, err = app.Add(label, i*blockRange+1000, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + select { + case db.compactc <- struct{}{}: + default: + } + + m := &dto.Metric{} + for x := 0; x < 10; x++ { + db.metrics.compactionsSkipped.Write(m) + if *m.Counter.Value > float64(0) { + break + } + time.Sleep(10 * time.Millisecond) + } + + testutil.Assert(t, *m.Counter.Value > float64(0), "No compaction was skipped after the set timeout.") + testutil.Equals(t, 0, len(db.blocks)) + + // Enable the compaction, trigger it and check that the block is persisted. + db.EnableCompactions() + select { + case db.compactc <- struct{}{}: + default: + } + for x := 0; x < 10; x++ { + if len(db.Blocks()) > 0 { + break + } + time.Sleep(30 * time.Millisecond) + } + testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") +} diff --git a/db.go b/db.go index 3f3ae72bb..9b92823ec 100644 --- a/db.go +++ b/db.go @@ -112,9 +112,13 @@ type DB struct { donec chan struct{} stopc chan struct{} - // cmtx is used to control compactions and deletions. - cmtx sync.Mutex - compactionsEnabled bool + // cmtx ensures that compactions and deletions don't run simultaneously. + cmtx sync.Mutex + + // autoCompactMtx ensures that no compaction gets triggered while + // changing the autoCompact var. + autoCompactMtx sync.Mutex + autoCompact bool } type dbMetrics struct { @@ -123,6 +127,7 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + compactionsSkipped prometheus.Counter cutoffs prometheus.Counter cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc @@ -165,6 +170,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_skipped_total", + Help: "Total number of skipped compactions due to disabled auto compaction.", + }) m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_retention_cutoffs_total", Help: "Number of times the database cut off block data from disk.", @@ -226,14 +235,14 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - compactionsEnabled: true, - chunkPool: chunkenc.NewPool(), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + autoCompact: true, + chunkPool: chunkenc.NewPool(), } db.metrics = newDBMetrics(db, r) @@ -300,14 +309,18 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - err := db.compact() - if err != nil { - level.Error(db.logger).Log("msg", "compaction failed", "err", err) - backoff = exponential(backoff, 1*time.Second, 1*time.Minute) + db.autoCompactMtx.Lock() + if db.autoCompact { + if err := db.compact(); err != nil { + level.Error(db.logger).Log("msg", "compaction failed", "err", err) + backoff = exponential(backoff, 1*time.Second, 1*time.Minute) + } else { + backoff = 0 + } } else { - backoff = 0 + db.metrics.compactionsSkipped.Inc() } - + db.autoCompactMtx.Unlock() case <-db.stopc: return } @@ -369,11 +382,6 @@ func (a dbAppender) Commit() error { func (db *DB) compact() (err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - - if !db.compactionsEnabled { - return nil - } - // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. for { @@ -731,21 +739,21 @@ func (db *DB) Close() error { return merr.Err() } -// DisableCompactions disables compactions. +// DisableCompactions disables auto compactions. func (db *DB) DisableCompactions() { - db.cmtx.Lock() - defer db.cmtx.Unlock() + db.autoCompactMtx.Lock() + defer db.autoCompactMtx.Unlock() - db.compactionsEnabled = false + db.autoCompact = false level.Info(db.logger).Log("msg", "compactions disabled") } -// EnableCompactions enables compactions. +// EnableCompactions enables auto compactions. func (db *DB) EnableCompactions() { - db.cmtx.Lock() - defer db.cmtx.Unlock() + db.autoCompactMtx.Lock() + defer db.autoCompactMtx.Unlock() - db.compactionsEnabled = true + db.autoCompact = true level.Info(db.logger).Log("msg", "compactions enabled") }