Introduced some options for compactor concurrency (#66)

* Tool for CLI compactions.
* Use concurrency when populating symbols for multiple blocks.
* Use concurrency when writing to multiple output blocks.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
owilliams/utf8-02-mimir
Peter Štibraný 3 years ago committed by GitHub
parent 415354aeb8
commit cc9bc8fe9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,96 @@
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"runtime/pprof"
"syscall"
golog "github.com/go-kit/log"
"github.com/prometheus/prometheus/tsdb"
)
func main() {
var (
outputDir string
shardCount int
cpuProf string
segmentSizeMB int64
maxClosingBlocks int
symbolFlushers int
)
flag.StringVar(&outputDir, "output-dir", ".", "Output directory for new block(s)")
flag.StringVar(&cpuProf, "cpuprofile", "", "Where to store CPU profile (it not empty)")
flag.IntVar(&shardCount, "shard-count", 1, "Number of shards for splitting")
flag.Int64Var(&segmentSizeMB, "segment-file-size", 512, "Size of segment file")
flag.IntVar(&maxClosingBlocks, "max-closing-blocks", 2, "Number of blocks that can close at once during split compaction")
flag.IntVar(&symbolFlushers, "symbol-flushers", 4, "Number of symbol flushers used during split compaction")
flag.Parse()
logger := golog.NewLogfmtLogger(os.Stderr)
var blockDirs []string
var blocks []*tsdb.Block
for _, d := range flag.Args() {
s, err := os.Stat(d)
if err != nil {
panic(err)
}
if !s.IsDir() {
log.Fatalln("not a directory: ", d)
}
blockDirs = append(blockDirs, d)
b, err := tsdb.OpenBlock(logger, d, nil)
if err != nil {
log.Fatalln("failed to open block:", d, err)
}
blocks = append(blocks, b)
defer b.Close()
}
if len(blockDirs) == 0 {
log.Fatalln("no blocks to compact")
}
if cpuProf != "" {
f, err := os.Create(cpuProf)
if err != nil {
log.Fatalln(err)
}
log.Println("writing to", cpuProf)
err = pprof.StartCPUProfile(f)
if err != nil {
log.Fatalln(err)
}
defer pprof.StopCPUProfile()
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil)
if err != nil {
log.Fatalln("creating compator", err)
}
opts := tsdb.DefaultConcurrencyOptions()
opts.MaxClosingBlocks = maxClosingBlocks
opts.SymbolsFlushersCount = symbolFlushers
c.SetConcurrencyOptions(opts)
_, err = c.CompactWithSplitting(outputDir, blockDirs, blocks, uint64(shardCount))
if err != nil {
log.Fatalln("compacting", err)
}
}

@ -0,0 +1,166 @@
package tsdb
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
// asyncBlockWriter runs a background goroutine that writes series and chunks to the block asynchronously.
type asyncBlockWriter struct {
chunkPool chunkenc.Pool // Where to return chunks after writing.
chunkw ChunkWriter
indexw IndexWriter
closeSemaphore *semaphore.Weighted
seriesChan chan seriesToWrite
finishedCh chan asyncBlockWriterResult
closed bool
result asyncBlockWriterResult
}
type asyncBlockWriterResult struct {
stats BlockStats
err error
}
type seriesToWrite struct {
lbls labels.Labels
chks []chunks.Meta
}
func newAsyncBlockWriter(chunkPool chunkenc.Pool, chunkw ChunkWriter, indexw IndexWriter, closeSema *semaphore.Weighted) *asyncBlockWriter {
bw := &asyncBlockWriter{
chunkPool: chunkPool,
chunkw: chunkw,
indexw: indexw,
seriesChan: make(chan seriesToWrite, 64),
finishedCh: make(chan asyncBlockWriterResult, 1),
closeSemaphore: closeSema,
}
go bw.loop()
return bw
}
// loop doing the writes. Return value is only used by defer statement, and is sent to the channel,
// before closing it.
func (bw *asyncBlockWriter) loop() (res asyncBlockWriterResult) {
defer func() {
bw.finishedCh <- res
close(bw.finishedCh)
}()
stats := BlockStats{}
ref := storage.SeriesRef(0)
for sw := range bw.seriesChan {
if err := bw.chunkw.WriteChunks(sw.chks...); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "write chunks")}
}
if err := bw.indexw.AddSeries(ref, sw.lbls, sw.chks...); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "add series")}
}
stats.NumChunks += uint64(len(sw.chks))
stats.NumSeries++
for _, chk := range sw.chks {
stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range sw.chks {
if err := bw.chunkPool.Put(chk.Chunk); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "put chunk")}
}
}
ref++
}
err := bw.closeSemaphore.Acquire(context.Background(), 1)
if err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "failed to acquire semaphore before closing writers")}
}
defer bw.closeSemaphore.Release(1)
// If everything went fine with writing so far, close writers.
if err := bw.chunkw.Close(); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "closing chunk writer")}
}
if err := bw.indexw.Close(); err != nil {
return asyncBlockWriterResult{err: errors.Wrap(err, "closing index writer")}
}
return asyncBlockWriterResult{stats: stats}
}
func (bw *asyncBlockWriter) addSeries(lbls labels.Labels, chks []chunks.Meta) error {
select {
case bw.seriesChan <- seriesToWrite{lbls: lbls, chks: chks}:
return nil
case result, ok := <-bw.finishedCh:
if ok {
bw.result = result
}
return fmt.Errorf("asyncBlockWriter doesn't run anymore")
}
}
func (bw *asyncBlockWriter) closeAsync() {
if !bw.closed {
bw.closed = true
close(bw.seriesChan)
}
}
func (bw *asyncBlockWriter) waitFinished() (BlockStats, error) {
// Wait for flusher to finish.
result, ok := <-bw.finishedCh
if ok {
bw.result = result
}
return bw.result.stats, bw.result.err
}
type preventDoubleCloseIndexWriter struct {
IndexWriter
closed atomic.Bool
}
func newPreventDoubleCloseIndexWriter(iw IndexWriter) *preventDoubleCloseIndexWriter {
return &preventDoubleCloseIndexWriter{IndexWriter: iw}
}
func (p *preventDoubleCloseIndexWriter) Close() error {
if p.closed.CAS(false, true) {
return p.IndexWriter.Close()
}
return nil
}
type preventDoubleCloseChunkWriter struct {
ChunkWriter
closed atomic.Bool
}
func newPreventDoubleCloseChunkWriter(cw ChunkWriter) *preventDoubleCloseChunkWriter {
return &preventDoubleCloseChunkWriter{ChunkWriter: cw}
}
func (p *preventDoubleCloseChunkWriter) Close() error {
if p.closed.CAS(false, true) {
return p.ChunkWriter.Close()
}
return nil
}

@ -29,6 +29,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -84,6 +85,8 @@ type LeveledCompactor struct {
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
concurrencyOpts ConcurrencyOptions
}
type compactorMetrics struct {
@ -172,9 +175,27 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
concurrencyOpts: DefaultConcurrencyOptions(),
}, nil
}
// ConcurrencyOptions used by LeveledCompactor.
type ConcurrencyOptions struct {
MaxClosingBlocks int // Max number of blocks that can be closed concurrently during split compaction.
SymbolsFlushersCount int // Number of symbols flushers used when doing split compaction.
}
func DefaultConcurrencyOptions() ConcurrencyOptions {
return ConcurrencyOptions{
MaxClosingBlocks: 1,
SymbolsFlushersCount: 1,
}
}
func (c *LeveledCompactor) SetConcurrencyOptions(opts ConcurrencyOptions) {
c.concurrencyOpts = opts
}
type dirMeta struct {
dir string
meta *BlockMeta
@ -646,6 +667,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
chunkw = newPreventDoubleCloseChunkWriter(chunkw) // We now close chunkWriter in populateBlock, but keep it in the closers here as well.
closers = append(closers, chunkw)
@ -661,10 +683,12 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
outBlocks[ix].chunkw = chunkw
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
var indexw IndexWriter
indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
if err != nil {
return errors.Wrap(err, "open index writer")
}
indexw = newPreventDoubleCloseIndexWriter(indexw) // We now close indexWriter in populateBlock, but keep it in the closers here as well.
closers = append(closers, indexw)
outBlocks[ix].indexw = indexw
@ -904,10 +928,14 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
}
}
var (
refs = make([]storage.SeriesRef, len(outBlocks))
chks []chunks.Meta
)
// Semaphore for number of blocks that can be closed at once.
sema := semaphore.NewWeighted(int64(c.concurrencyOpts.MaxClosingBlocks))
blockWriters := make([]*asyncBlockWriter, len(outBlocks))
for ix := range outBlocks {
blockWriters[ix] = newAsyncBlockWriter(c.chunkPool, outBlocks[ix].chunkw, outBlocks[ix].indexw, sema)
defer blockWriters[ix].closeAsync() // Make sure to close writer to stop goroutine.
}
set := sets[0]
if len(sets) > 1 {
@ -926,7 +954,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
s := set.At()
chksIter := s.Iterator()
chks = chks[:0]
var chks []chunks.Meta
for chksIter.Next() {
// We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and
// chunk file purposes.
@ -948,30 +976,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
obIx = s.Labels().Hash() % uint64(len(outBlocks))
}
if err := outBlocks[obIx].chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := outBlocks[obIx].indexw.AddSeries(refs[obIx], s.Labels(), chks...); err != nil {
return errors.Wrap(err, "add series")
}
outBlocks[obIx].meta.Stats.NumChunks += uint64(len(chks))
outBlocks[obIx].meta.Stats.NumSeries++
for _, chk := range chks {
outBlocks[obIx].meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range chks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
err := blockWriters[obIx].addSeries(s.Labels(), chks)
if err != nil {
return errors.Wrap(err, "adding series")
}
refs[obIx]++
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
}
for ix := range blockWriters {
blockWriters[ix].closeAsync()
}
for ix := range blockWriters {
stats, err := blockWriters[ix].waitFinished()
if err != nil {
return errors.Wrap(err, "writing block")
}
outBlocks[ix].meta.Stats = stats
}
return nil
}
@ -986,9 +1012,12 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
return errors.New("no output block")
}
flushers := newSymbolFlushers(c.concurrencyOpts.SymbolsFlushersCount)
defer flushers.close() // Make sure to stop flushers before exiting to avoid leaking goroutines.
batchers := make([]*symbolsBatcher, len(outBlocks))
for ix := range outBlocks {
batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir)
batchers[ix] = newSymbolsBatcher(inMemorySymbolsLimit, outBlocks[ix].tmpDir, flushers)
// Always include empty symbol. Blocks created from Head always have it in the symbols table,
// and if we only include symbols from series, we would skip it.
@ -1023,16 +1052,25 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
}
for ix := range outBlocks {
if err := c.ctx.Err(); err != nil {
return err
}
// Flush the batcher to write remaining symbols.
if err := batchers[ix].flushSymbols(true); err != nil {
return errors.Wrap(err, "flushing batcher")
}
}
err := flushers.close()
if err != nil {
return errors.Wrap(err, "closing flushers")
}
for ix := range outBlocks {
if err := c.ctx.Err(); err != nil {
return err
}
symbolFiles := batchers[ix].getSymbolFiles()
it, err := newSymbolsIterator(batchers[ix].symbolFiles())
it, err := newSymbolsIterator(symbolFiles)
if err != nil {
return errors.Wrap(err, "opening symbols iterator")
}
@ -1064,7 +1102,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
// Delete symbol files from symbolsBatcher. We don't need to perform the cleanup if populateSymbols
// or compaction fails, because in that case compactor already removes entire (temp) output block directory.
for _, fn := range batchers[ix].symbolFiles() {
for _, fn := range symbolFiles {
if err := os.Remove(fn); err != nil {
return errors.Wrap(err, "deleting symbols file")
}

@ -8,12 +8,120 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/tsdb/errors"
)
// symbolFlushers writes symbols to provided files in background goroutines.
type symbolFlushers struct {
jobs chan flusherJob
wg sync.WaitGroup
closed bool
errMu sync.Mutex
err error
pool *sync.Pool
}
func newSymbolFlushers(concurrency int) *symbolFlushers {
f := &symbolFlushers{
jobs: make(chan flusherJob),
pool: &sync.Pool{},
}
for i := 0; i < concurrency; i++ {
f.wg.Add(1)
go f.loop()
}
return f
}
func (f *symbolFlushers) flushSymbols(outputFile string, symbols map[string]struct{}) error {
if len(symbols) == 0 {
return fmt.Errorf("no symbols")
}
f.errMu.Lock()
err := f.err
f.errMu.Unlock()
// If there was any error previously, return it.
if err != nil {
return err
}
f.jobs <- flusherJob{
outputFile: outputFile,
symbols: symbols,
}
return nil
}
func (f *symbolFlushers) loop() {
defer f.wg.Done()
for j := range f.jobs {
var sortedSymbols []string
pooled := f.pool.Get()
if pooled == nil {
sortedSymbols = make([]string, 0, len(j.symbols))
} else {
sortedSymbols = pooled.([]string)
sortedSymbols = sortedSymbols[:0]
}
for s := range j.symbols {
sortedSymbols = append(sortedSymbols, s)
}
sort.Strings(sortedSymbols)
err := writeSymbolsToFile(j.outputFile, sortedSymbols)
sortedSymbols = sortedSymbols[:0]
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
f.pool.Put(sortedSymbols)
if err != nil {
f.errMu.Lock()
if f.err == nil {
f.err = err
}
f.errMu.Unlock()
break
}
}
for range f.jobs {
// drain the channel, don't do more flushing. only used when error occurs.
}
}
// Stops and waits until all flusher goroutines are finished.
func (f *symbolFlushers) close() error {
if f.closed {
return f.err
}
f.closed = true
close(f.jobs)
f.wg.Wait()
return f.err
}
type flusherJob struct {
outputFile string
symbols map[string]struct{}
}
// symbolsBatcher keeps buffer of symbols in memory. Once the buffer reaches the size limit (number of symbols),
// batcher writes currently buffered symbols to file. At the end remaining symbols must be flushed. After writing
// all batches, symbolsBatcher has list of files that can be used together with newSymbolsIterator to iterate
@ -22,15 +130,18 @@ type symbolsBatcher struct {
dir string
limit int
buffer map[string]struct{} // using map to deduplicate
symbolsFiles []string // paths of symbol files that have been successfully written.
symbolsFiles []string // paths of symbol files, which were sent to flushers for flushing
buffer map[string]struct{} // using map to deduplicate
flushers *symbolFlushers
}
func newSymbolsBatcher(limit int, dir string) *symbolsBatcher {
func newSymbolsBatcher(limit int, dir string, flushers *symbolFlushers) *symbolsBatcher {
return &symbolsBatcher{
limit: limit,
dir: dir,
buffer: make(map[string]struct{}, limit),
limit: limit,
dir: dir,
buffer: make(map[string]struct{}, limit),
flushers: flushers,
}
}
@ -44,23 +155,21 @@ func (sw *symbolsBatcher) flushSymbols(force bool) error {
return nil
}
sortedSymbols := make([]string, 0, len(sw.buffer))
for s := range sw.buffer {
sortedSymbols = append(sortedSymbols, s)
if len(sw.buffer) == 0 {
return nil
}
sort.Strings(sortedSymbols)
symbolsFile := filepath.Join(sw.dir, fmt.Sprintf("symbols_%d", len(sw.symbolsFiles)))
err := writeSymbolsToFile(symbolsFile, sortedSymbols)
if err == nil {
sw.buffer = make(map[string]struct{}, sw.limit)
sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile)
}
sw.symbolsFiles = append(sw.symbolsFiles, symbolsFile)
return err
buf := sw.buffer
sw.buffer = make(map[string]struct{}, sw.limit)
return sw.flushers.flushSymbols(symbolsFile, buf)
}
func (sw *symbolsBatcher) symbolFiles() []string {
// getSymbolFiles returns list of symbol files used to flush symbols to. These files are only valid if flushers
// finish successfully.
func (sw *symbolsBatcher) getSymbolFiles() []string {
return sw.symbolsFiles
}

@ -8,14 +8,25 @@ import (
"github.com/stretchr/testify/require"
)
func TestSymbolsBatchAndIteration(t *testing.T) {
func TestSymbolsBatchAndIteration1(t *testing.T) {
testSymbolsBatchAndIterationWithFlushersConcurrency(t, 1)
}
func TestSymbolsBatchAndIteration5(t *testing.T) {
testSymbolsBatchAndIterationWithFlushersConcurrency(t, 5)
}
func testSymbolsBatchAndIterationWithFlushersConcurrency(t *testing.T, flushersConcurrency int) {
flushers := newSymbolFlushers(flushersConcurrency)
defer func() { _ = flushers.close() }()
dir := t.TempDir()
b := newSymbolsBatcher(100, dir)
b := newSymbolsBatcher(100, dir, flushers)
allWords := map[string]struct{}{}
for i := 0; i < 10; i++ {
for i := 0; i < 10*flushersConcurrency; i++ {
require.NoError(t, b.addSymbol(""))
allWords[""] = struct{}{}
@ -29,8 +40,12 @@ func TestSymbolsBatchAndIteration(t *testing.T) {
}
require.NoError(t, b.flushSymbols(true))
require.NoError(t, b.flushSymbols(true)) // call again, this should do nothing, and not create new empty file.
require.NoError(t, flushers.close())
symbols := b.getSymbolFiles()
it, err := newSymbolsIterator(b.symbolFiles())
it, err := newSymbolsIterator(symbols)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, it.Close())

Loading…
Cancel
Save