Allow manual compaction for tests when compaction is disabled globally. (#412)

for tests we need to control when a compaction happens so with this
change automated compaction can be disabled, but allow to run it
manually it tests.

fixes failing tests in : https://github.com/prometheus/tsdb/pull/374

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 6 years ago committed by GitHub
parent 7f30395115
commit 7f00217d77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,10 +19,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -682,3 +685,59 @@ func TestCompaction_populateBlock(t *testing.T) {
} }
} }
} }
// TestDisableAutoCompactions checks that we can
// disable and enable the auto compaction.
// This is needed for unit tests that rely on
// checking state before and after a compaction.
func TestDisableAutoCompactions(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
// Trigger a compaction to check that it was skipped and
// no new blocks were created when compaction is disabled.
db.DisableCompactions()
app := db.Appender()
for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
select {
case db.compactc <- struct{}{}:
default:
}
m := &dto.Metric{}
for x := 0; x < 10; x++ {
db.metrics.compactionsSkipped.Write(m)
if *m.Counter.Value > float64(0) {
break
}
time.Sleep(10 * time.Millisecond)
}
testutil.Assert(t, *m.Counter.Value > float64(0), "No compaction was skipped after the set timeout.")
testutil.Equals(t, 0, len(db.blocks))
// Enable the compaction, trigger it and check that the block is persisted.
db.EnableCompactions()
select {
case db.compactc <- struct{}{}:
default:
}
for x := 0; x < 10; x++ {
if len(db.Blocks()) > 0 {
break
}
time.Sleep(30 * time.Millisecond)
}
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
}

46
db.go

@ -112,9 +112,13 @@ type DB struct {
donec chan struct{} donec chan struct{}
stopc chan struct{} stopc chan struct{}
// cmtx is used to control compactions and deletions. // cmtx ensures that compactions and deletions don't run simultaneously.
cmtx sync.Mutex cmtx sync.Mutex
compactionsEnabled bool
// autoCompactMtx ensures that no compaction gets triggered while
// changing the autoCompact var.
autoCompactMtx sync.Mutex
autoCompact bool
} }
type dbMetrics struct { type dbMetrics struct {
@ -123,6 +127,7 @@ type dbMetrics struct {
reloads prometheus.Counter reloads prometheus.Counter
reloadsFailed prometheus.Counter reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter compactionsTriggered prometheus.Counter
compactionsSkipped prometheus.Counter
cutoffs prometheus.Counter cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc startTime prometheus.GaugeFunc
@ -165,6 +170,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.", Help: "Total number of triggered compactions for the partition.",
}) })
m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_skipped_total",
Help: "Total number of skipped compactions due to disabled auto compaction.",
})
m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_total", Name: "prometheus_tsdb_retention_cutoffs_total",
Help: "Number of times the database cut off block data from disk.", Help: "Number of times the database cut off block data from disk.",
@ -232,7 +241,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
compactc: make(chan struct{}, 1), compactc: make(chan struct{}, 1),
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
compactionsEnabled: true, autoCompact: true,
chunkPool: chunkenc.NewPool(), chunkPool: chunkenc.NewPool(),
} }
db.metrics = newDBMetrics(db, r) db.metrics = newDBMetrics(db, r)
@ -300,14 +309,18 @@ func (db *DB) run() {
case <-db.compactc: case <-db.compactc:
db.metrics.compactionsTriggered.Inc() db.metrics.compactionsTriggered.Inc()
err := db.compact() db.autoCompactMtx.Lock()
if err != nil { if db.autoCompact {
if err := db.compact(); err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err) level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute) backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else { } else {
backoff = 0 backoff = 0
} }
} else {
db.metrics.compactionsSkipped.Inc()
}
db.autoCompactMtx.Unlock()
case <-db.stopc: case <-db.stopc:
return return
} }
@ -369,11 +382,6 @@ func (a dbAppender) Commit() error {
func (db *DB) compact() (err error) { func (db *DB) compact() (err error) {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
if !db.compactionsEnabled {
return nil
}
// Check whether we have pending head blocks that are ready to be persisted. // Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority. // They have the highest priority.
for { for {
@ -731,21 +739,21 @@ func (db *DB) Close() error {
return merr.Err() return merr.Err()
} }
// DisableCompactions disables compactions. // DisableCompactions disables auto compactions.
func (db *DB) DisableCompactions() { func (db *DB) DisableCompactions() {
db.cmtx.Lock() db.autoCompactMtx.Lock()
defer db.cmtx.Unlock() defer db.autoCompactMtx.Unlock()
db.compactionsEnabled = false db.autoCompact = false
level.Info(db.logger).Log("msg", "compactions disabled") level.Info(db.logger).Log("msg", "compactions disabled")
} }
// EnableCompactions enables compactions. // EnableCompactions enables auto compactions.
func (db *DB) EnableCompactions() { func (db *DB) EnableCompactions() {
db.cmtx.Lock() db.autoCompactMtx.Lock()
defer db.cmtx.Unlock() defer db.autoCompactMtx.Unlock()
db.compactionsEnabled = true db.autoCompact = true
level.Info(db.logger).Log("msg", "compactions enabled") level.Info(db.logger).Log("msg", "compactions enabled")
} }

Loading…
Cancel
Save