From 96c2bd249f1af0937ff65471fd76d7b4cf1970b7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Jan 2017 12:37:28 +0100 Subject: [PATCH] Handle compaction trigger and reinitializing in DB --- compact.go | 138 +++++++---------------------------- db.go | 115 +++++++++++++++++++++++++----- head.go | 3 + querier.go | 206 ++++++++++++++++++++++++++--------------------------- 4 files changed, 231 insertions(+), 231 deletions(-) diff --git a/compact.go b/compact.go index 4f3838c02..8db87bcf0 100644 --- a/compact.go +++ b/compact.go @@ -1,15 +1,12 @@ package tsdb import ( - "fmt" "os" "path/filepath" - "sync" "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" - "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -17,26 +14,17 @@ import ( type compactor struct { metrics *compactorMetrics blocks compactableBlocks - logger log.Logger - - triggerc chan struct{} - donec chan struct{} } type compactorMetrics struct { - triggered prometheus.Counter - ran prometheus.Counter - failed prometheus.Counter - duration prometheus.Histogram + ran prometheus.Counter + failed prometheus.Counter + duration prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} - m.triggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_triggered_total", - Help: "Total number of triggered compactions for the partition.", - }) m.ran = prometheus.NewCounter(prometheus.CounterOpts{ Name: "tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", @@ -52,7 +40,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { if r != nil { r.MustRegister( - m.triggered, m.ran, m.failed, m.duration, @@ -62,71 +49,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type compactableBlocks interface { - lock() sync.Locker compactable() []block - reinit(dir string) error } -func newCompactor(blocks compactableBlocks, l log.Logger) (*compactor, error) { +func newCompactor(blocks compactableBlocks) (*compactor, error) { c := &compactor{ - triggerc: make(chan struct{}, 1), - donec: make(chan struct{}), - logger: l, - blocks: blocks, - metrics: newCompactorMetrics(nil), + blocks: blocks, + metrics: newCompactorMetrics(nil), } - go c.run() return c, nil } -func (c *compactor) trigger() { - select { - case c.triggerc <- struct{}{}: - default: - } -} - -func (c *compactor) run() { - for range c.triggerc { - c.metrics.triggered.Inc() - - // Compact as long as there are candidate blocks. - for { - rev := c.pick() - var bs []block - for _, b := range rev { - bs = append([]block{b}, bs...) - } - - c.logger.Log("msg", "picked for compaction", "candidates", fmt.Sprintf("%v", bs)) - - if len(bs) == 0 { - break - } - - start := time.Now() - err := c.compact(bs...) - - c.metrics.ran.Inc() - c.metrics.duration.Observe(time.Since(start).Seconds()) - - if err != nil { - c.logger.Log("msg", "compaction failed", "err", err) - c.metrics.failed.Inc() - break - } - } - - // Drain channel of signals triggered during compaction. - select { - case <-c.triggerc: - default: - } - } - close(c.donec) -} - const ( compactionMaxSize = 1 << 30 // 1GB compactionBlocks = 2 @@ -158,12 +92,6 @@ func compactionMatch(blocks []block) bool { return true } -func (c *compactor) Close() error { - close(c.triggerc) - <-c.donec - return nil -} - func mergeStats(blocks ...block) (res BlockStats) { res.MinTime = blocks[0].stats().MinTime res.MaxTime = blocks[len(blocks)-1].stats().MaxTime @@ -174,24 +102,30 @@ func mergeStats(blocks ...block) (res BlockStats) { return res } -func (c *compactor) compact(blocks ...block) error { - tmpdir := blocks[0].dir() + ".tmp" +func (c *compactor) compact(dir string, blocks ...block) (err error) { + start := time.Now() + defer func() { + if err != nil { + c.metrics.failed.Inc() + } + c.metrics.duration.Observe(time.Since(start).Seconds()) + }() // Write to temporary directory to make persistence appear atomic. - if fileutil.Exist(tmpdir) { - if err := os.RemoveAll(tmpdir); err != nil { + if fileutil.Exist(dir) { + if err = os.RemoveAll(dir); err != nil { return err } } - if err := fileutil.CreateDirAll(tmpdir); err != nil { + if err = fileutil.CreateDirAll(dir); err != nil { return err } - chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + chunkf, err := fileutil.LockFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create chunk file") } - indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + indexf, err := fileutil.LockFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create index file") } @@ -199,47 +133,29 @@ func (c *compactor) compact(blocks ...block) error { indexw := newIndexWriter(indexf) chunkw := newSeriesWriter(chunkf, indexw) - if err := c.write(blocks, indexw, chunkw); err != nil { + if err = c.write(blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - if err := chunkw.Close(); err != nil { + if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") } - if err := indexw.Close(); err != nil { + if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err := fileutil.Fsync(chunkf.File); err != nil { + if err = fileutil.Fsync(chunkf.File); err != nil { return errors.Wrap(err, "fsync chunk file") } - if err := fileutil.Fsync(indexf.File); err != nil { + if err = fileutil.Fsync(indexf.File); err != nil { return errors.Wrap(err, "fsync index file") } - if err := chunkf.Close(); err != nil { + if err = chunkf.Close(); err != nil { return errors.Wrap(err, "close chunk file") } - if err := indexf.Close(); err != nil { + if err = indexf.Close(); err != nil { return errors.Wrap(err, "close index file") } - - c.blocks.lock().Lock() - defer c.blocks.lock().Unlock() - - if err := renameDir(tmpdir, blocks[0].dir()); err != nil { - return errors.Wrap(err, "rename dir") - } - for _, b := range blocks[1:] { - if err := os.RemoveAll(b.dir()); err != nil { - return errors.Wrap(err, "delete dir") - } - } - - var merr MultiError - - for _, b := range blocks { - merr.Add(errors.Wrapf(c.blocks.reinit(b.dir()), "reinit block at %q", b.dir())) - } - return merr.Err() + return nil } func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error { diff --git a/db.go b/db.go index eff94456d..e0ed8e804 100644 --- a/db.go +++ b/db.go @@ -73,12 +73,17 @@ type DB struct { persisted []*persistedBlock heads []*HeadBlock compactor *compactor + + compactc chan struct{} + donec chan struct{} + stopc chan struct{} } type dbMetrics struct { - persistences prometheus.Counter - persistenceDuration prometheus.Histogram - samplesAppended prometheus.Counter + persistences prometheus.Counter + persistenceDuration prometheus.Histogram + samplesAppended prometheus.Counter + compactionsTriggered prometheus.Counter } func newDBMetrics(r prometheus.Registerer) *dbMetrics { @@ -97,6 +102,10 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { Name: "tsdb_samples_appended_total", Help: "Total number of appended sampledb.", }) + m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_compactions_triggered_total", + Help: "Total number of triggered compactions for the partition.", + }) if r != nil { r.MustRegister( @@ -109,7 +118,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { } // Open returns a new DB in the given directory. -func Open(dir string, logger log.Logger) (p *DB, err error) { +func Open(dir string, logger log.Logger) (db *DB, err error) { // Create directory if partition is new. if !fileutil.Exist(dir) { if err := os.MkdirAll(dir, 0777); err != nil { @@ -117,19 +126,90 @@ func Open(dir string, logger log.Logger) (p *DB, err error) { } } - p = &DB{ - dir: dir, - logger: logger, - metrics: newDBMetrics(nil), + db = &DB{ + dir: dir, + logger: logger, + metrics: newDBMetrics(nil), + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), } - if err := p.initBlocks(); err != nil { + + if err := db.initBlocks(); err != nil { return nil, err } - if p.compactor, err = newCompactor(p, logger); err != nil { + if db.compactor, err = newCompactor(db); err != nil { return nil, err } - return p, nil + go db.run() + + return db, nil +} + +func (db *DB) run() { + defer close(db.donec) + + for { + select { + case <-db.compactc: + db.metrics.compactionsTriggered.Inc() + + for { + blocks := db.compactor.pick() + if len(blocks) == 0 { + break + } + // TODO(fabxc): pick emits blocks in order. compact acts on + // inverted order. Put inversion into compactor? + var bs []block + for _, b := range blocks { + bs = append([]block{b}, bs...) + } + + select { + case <-db.stopc: + return + default: + } + if err := db.compact(bs); err != nil { + db.logger.Log("msg", "compaction failed", "err", err) + } + } + case <-db.stopc: + return + } + } +} + +func (db *DB) compact(blocks []block) error { + if len(blocks) == 0 { + return nil + } + tmpdir := blocks[0].dir() + ".tmp" + + if err := db.compactor.compact(tmpdir, blocks...); err != nil { + return err + } + + db.mtx.Lock() + defer db.mtx.Unlock() + + if err := renameDir(tmpdir, blocks[0].dir()); err != nil { + return errors.Wrap(err, "rename dir") + } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.dir()); err != nil { + return errors.Wrap(err, "delete dir") + } + } + + var merr MultiError + + for _, b := range blocks { + merr.Add(errors.Wrapf(db.reinit(b.dir()), "reinit block at %q", b.dir())) + } + return merr.Err() } func isBlockDir(fi os.FileInfo) bool { @@ -202,8 +282,10 @@ func (db *DB) initBlocks() error { // Close the partition. func (db *DB) Close() error { + close(db.stopc) + <-db.donec + var merr MultiError - merr.Add(db.compactor.Close()) db.mtx.Lock() defer db.mtx.Unlock() @@ -240,17 +322,16 @@ func (db *DB) appendBatch(samples []hashedSample) error { if err := db.cut(); err != nil { db.logger.Log("msg", "cut failed", "err", err) } else { - db.compactor.trigger() + select { + case db.compactc <- struct{}{}: + default: + } } } return err } -func (db *DB) lock() sync.Locker { - return &db.mtx -} - func (db *DB) headForDir(dir string) (int, bool) { for i, b := range db.heads { if b.dir() == dir { diff --git a/head.go b/head.go index a01c6556d..b09c55872 100644 --- a/head.go +++ b/head.go @@ -50,6 +50,9 @@ func OpenHeadBlock(dir string) (*HeadBlock, error) { wal: wal, } + b.bstats.MinTime = math.MaxInt64 + b.bstats.MaxTime = math.MinInt64 + err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { b.create(lset.Hash(), lset) diff --git a/querier.go b/querier.go index 4c0cca9a1..0bcd336c5 100644 --- a/querier.go +++ b/querier.go @@ -35,102 +35,11 @@ type Series interface { Iterator() SeriesIterator } -// querier merges query results from a set of partition querieres. -type querier struct { - mint, maxt int64 - partitions []Querier -} - -// Querier returns a new querier over the database for the given -// time range. -func (db *PartitionedDB) Querier(mint, maxt int64) Querier { - q := &querier{ - mint: mint, - maxt: maxt, - } - for _, s := range db.Partitions { - q.partitions = append(q.partitions, s.Querier(mint, maxt)) - } - - return q -} - -func (q *querier) Select(ms ...labels.Matcher) SeriesSet { - // We gather the non-overlapping series from every partition and simply - // return their union. - r := &mergedSeriesSet{} - - for _, s := range q.partitions { - r.sets = append(r.sets, s.Select(ms...)) - } - if len(r.sets) == 0 { - return nopSeriesSet{} - } - return r -} - -func (q *querier) LabelValues(n string) ([]string, error) { - res, err := q.partitions[0].LabelValues(n) - if err != nil { - return nil, err - } - for _, sq := range q.partitions[1:] { - pr, err := sq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) - } - return res, nil -} - -func mergeStrings(a, b []string) []string { - maxl := len(a) - if len(b) > len(a) { - maxl = len(b) - } - res := make([]string, 0, maxl*10/9) - - for len(a) > 0 && len(b) > 0 { - d := strings.Compare(a[0], b[0]) - - if d == 0 { - res = append(res, a[0]) - a, b = a[1:], b[1:] - } else if d < 0 { - res = append(res, a[0]) - a = a[1:] - } else if d > 0 { - res = append(res, b[0]) - b = b[1:] - } - } - - // Append all remaining elements. - res = append(res, a...) - res = append(res, b...) - return res -} - -func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *querier) Close() error { - var merr MultiError - - for _, sq := range q.partitions { - merr.Add(sq.Close()) - } - return merr.Err() -} - -// partitionQuerier aggregates querying results from time blocks within +// querier aggregates querying results from time blocks within // a single partition. -type partitionQuerier struct { - partition *DB - blocks []Querier +type querier struct { + db *DB + blocks []Querier } // Querier returns a new querier over the data partition for the given @@ -140,9 +49,9 @@ func (s *DB) Querier(mint, maxt int64) Querier { blocks := s.blocksForInterval(mint, maxt) - sq := &partitionQuerier{ - blocks: make([]Querier, 0, len(blocks)), - partition: s, + sq := &querier{ + blocks: make([]Querier, 0, len(blocks)), + db: s, } for _, b := range blocks { @@ -163,7 +72,7 @@ func (s *DB) Querier(mint, maxt int64) Querier { return sq } -func (q *partitionQuerier) LabelValues(n string) ([]string, error) { +func (q *querier) LabelValues(n string) ([]string, error) { res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -179,11 +88,11 @@ func (q *partitionQuerier) LabelValues(n string) ([]string, error) { return res, nil } -func (q *partitionQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { +func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet { +func (q *querier) Select(ms ...labels.Matcher) SeriesSet { // Sets from different blocks have no time overlap. The reference numbers // they emit point to series sorted in lexicographic order. // We can fully connect partial series by simply comparing with the previous @@ -199,13 +108,13 @@ func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet { return r } -func (q *partitionQuerier) Close() error { +func (q *querier) Close() error { var merr MultiError for _, bq := range q.blocks { merr.Add(bq.Close()) } - q.partition.mtx.RUnlock() + q.db.mtx.RUnlock() return merr.Err() } @@ -321,6 +230,97 @@ func (q *blockQuerier) Close() error { return nil } +// partitionedQuerier merges query results from a set of partition querieres. +type partitionedQuerier struct { + mint, maxt int64 + partitions []Querier +} + +// Querier returns a new querier over the database for the given +// time range. +func (db *PartitionedDB) Querier(mint, maxt int64) Querier { + q := &partitionedQuerier{ + mint: mint, + maxt: maxt, + } + for _, s := range db.Partitions { + q.partitions = append(q.partitions, s.Querier(mint, maxt)) + } + + return q +} + +func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet { + // We gather the non-overlapping series from every partition and simply + // return their union. + r := &mergedSeriesSet{} + + for _, s := range q.partitions { + r.sets = append(r.sets, s.Select(ms...)) + } + if len(r.sets) == 0 { + return nopSeriesSet{} + } + return r +} + +func (q *partitionedQuerier) LabelValues(n string) ([]string, error) { + res, err := q.partitions[0].LabelValues(n) + if err != nil { + return nil, err + } + for _, sq := range q.partitions[1:] { + pr, err := sq.LabelValues(n) + if err != nil { + return nil, err + } + // Merge new values into deduplicated result. + res = mergeStrings(res, pr) + } + return res, nil +} + +func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + +func (q *partitionedQuerier) Close() error { + var merr MultiError + + for _, sq := range q.partitions { + merr.Add(sq.Close()) + } + return merr.Err() +} + +func mergeStrings(a, b []string) []string { + maxl := len(a) + if len(b) > len(a) { + maxl = len(b) + } + res := make([]string, 0, maxl*10/9) + + for len(a) > 0 && len(b) > 0 { + d := strings.Compare(a[0], b[0]) + + if d == 0 { + res = append(res, a[0]) + a, b = a[1:], b[1:] + } else if d < 0 { + res = append(res, a[0]) + a = a[1:] + } else if d > 0 { + res = append(res, b[0]) + b = b[1:] + } + } + + // Append all remaining elements. + res = append(res, a...) + res = append(res, b...) + return res +} + // SeriesSet contains a set of series. type SeriesSet interface { Next() bool