diff --git a/compact.go b/compact.go index 37fa3ae00..556856107 100644 --- a/compact.go +++ b/compact.go @@ -76,12 +76,13 @@ type LeveledCompactor struct { } type compactorMetrics struct { - ran prometheus.Counter - failed prometheus.Counter - duration prometheus.Histogram - chunkSize prometheus.Histogram - chunkSamples prometheus.Histogram - chunkRange prometheus.Histogram + ran prometheus.Counter + populatingBlocks prometheus.Counter + failed prometheus.Counter + duration prometheus.Histogram + chunkSize prometheus.Histogram + chunkSamples prometheus.Histogram + chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { @@ -91,6 +92,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) + m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_compaction_populating_block", + Help: "Set to 1 when a block is currently being written to the disk.", + }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", @@ -119,6 +124,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { if r != nil { r.MustRegister( m.ran, + m.populatingBlocks, m.failed, m.duration, m.chunkRange, @@ -569,8 +575,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) - defer func() { closeAll(closers...) }() + defer func() { + closeAll(closers...) + c.metrics.populatingBlocks.Add(-1) + }() + c.metrics.populatingBlocks.Inc() for i, b := range blocks { select { case <-c.ctx.Done(): diff --git a/compact_test.go b/compact_test.go index 89c36767f..36bcb220c 100644 --- a/compact_test.go +++ b/compact_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -742,3 +743,38 @@ func TestDisableAutoCompactions(t *testing.T) { } testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } + +// TestCancelCompactions ensures that when the db is closed +// any running compaction is cancelled to unblock closing the db. +func TestCancelCompactions(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer os.RemoveAll(tmpdir) + + // Create some blocks to fall within the compaction range. + createPopulatedBlock(t, tmpdir, 3000, 0, 1000) + createPopulatedBlock(t, tmpdir, 3000, 1000, 2000) + createPopulatedBlock(t, tmpdir, 1, 2000, 2001) + + db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + + db.compactc <- struct{}{} // Trigger a compaction. + dbClosed := make(chan struct{}) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + time.Sleep(5 * time.Millisecond) + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + break + } + } + + start := time.Now() + <-dbClosed + actT := time.Since(start) + expT := time.Duration(50000000) + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) +}