Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 6 years ago
parent bd5ccee5c1
commit fced260a24

@ -76,12 +76,13 @@ type LeveledCompactor struct {
}
type compactorMetrics struct {
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
ran prometheus.Counter
populatingBlocks prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
}
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
@ -91,6 +92,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
})
m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_compaction_populating_block",
Help: "Set to 1 when a block is currently being written to the disk.",
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
@ -119,6 +124,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
if r != nil {
r.MustRegister(
m.ran,
m.populatingBlocks,
m.failed,
m.duration,
m.chunkRange,
@ -569,8 +575,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
)
defer func() { closeAll(closers...) }()
defer func() {
closeAll(closers...)
c.metrics.populatingBlocks.Add(-1)
}()
c.metrics.populatingBlocks.Inc()
for i, b := range blocks {
select {
case <-c.ctx.Done():

@ -24,6 +24,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
@ -742,3 +743,38 @@ func TestDisableAutoCompactions(t *testing.T) {
}
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
}
// TestCancelCompactions ensures that when the db is closed
// any running compaction is cancelled to unblock closing the db.
func TestCancelCompactions(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
// Create some blocks to fall within the compaction range.
createPopulatedBlock(t, tmpdir, 3000, 0, 1000)
createPopulatedBlock(t, tmpdir, 3000, 1000, 2000)
createPopulatedBlock(t, tmpdir, 1, 2000, 2001)
db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}})
testutil.Ok(t, err)
db.compactc <- struct{}{} // Trigger a compaction.
dbClosed := make(chan struct{})
for {
if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 {
time.Sleep(5 * time.Millisecond)
go func() {
testutil.Ok(t, db.Close())
close(dbClosed)
}()
break
}
}
start := time.Now()
<-dbClosed
actT := time.Since(start)
expT := time.Duration(50000000)
testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
}

Loading…
Cancel
Save