Handle compaction trigger and reinitializing in DB

pull/5805/head
Fabian Reinartz 2017-01-06 12:37:28 +01:00
parent 3ed2c2a14b
commit 96c2bd249f
4 changed files with 231 additions and 231 deletions

View File

@ -1,15 +1,12 @@
package tsdb
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
@ -17,14 +14,9 @@ import (
type compactor struct {
metrics *compactorMetrics
blocks compactableBlocks
logger log.Logger
triggerc chan struct{}
donec chan struct{}
}
type compactorMetrics struct {
triggered prometheus.Counter
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
@ -33,10 +25,6 @@ type compactorMetrics struct {
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m := &compactorMetrics{}
m.triggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
m.ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
@ -52,7 +40,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
if r != nil {
r.MustRegister(
m.triggered,
m.ran,
m.failed,
m.duration,
@ -62,71 +49,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
}
type compactableBlocks interface {
lock() sync.Locker
compactable() []block
reinit(dir string) error
}
func newCompactor(blocks compactableBlocks, l log.Logger) (*compactor, error) {
func newCompactor(blocks compactableBlocks) (*compactor, error) {
c := &compactor{
triggerc: make(chan struct{}, 1),
donec: make(chan struct{}),
logger: l,
blocks: blocks,
metrics: newCompactorMetrics(nil),
}
go c.run()
return c, nil
}
func (c *compactor) trigger() {
select {
case c.triggerc <- struct{}{}:
default:
}
}
func (c *compactor) run() {
for range c.triggerc {
c.metrics.triggered.Inc()
// Compact as long as there are candidate blocks.
for {
rev := c.pick()
var bs []block
for _, b := range rev {
bs = append([]block{b}, bs...)
}
c.logger.Log("msg", "picked for compaction", "candidates", fmt.Sprintf("%v", bs))
if len(bs) == 0 {
break
}
start := time.Now()
err := c.compact(bs...)
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(start).Seconds())
if err != nil {
c.logger.Log("msg", "compaction failed", "err", err)
c.metrics.failed.Inc()
break
}
}
// Drain channel of signals triggered during compaction.
select {
case <-c.triggerc:
default:
}
}
close(c.donec)
}
const (
compactionMaxSize = 1 << 30 // 1GB
compactionBlocks = 2
@ -158,12 +92,6 @@ func compactionMatch(blocks []block) bool {
return true
}
func (c *compactor) Close() error {
close(c.triggerc)
<-c.donec
return nil
}
func mergeStats(blocks ...block) (res BlockStats) {
res.MinTime = blocks[0].stats().MinTime
res.MaxTime = blocks[len(blocks)-1].stats().MaxTime
@ -174,24 +102,30 @@ func mergeStats(blocks ...block) (res BlockStats) {
return res
}
func (c *compactor) compact(blocks ...block) error {
tmpdir := blocks[0].dir() + ".tmp"
func (c *compactor) compact(dir string, blocks ...block) (err error) {
start := time.Now()
defer func() {
if err != nil {
c.metrics.failed.Inc()
}
c.metrics.duration.Observe(time.Since(start).Seconds())
}()
// Write to temporary directory to make persistence appear atomic.
if fileutil.Exist(tmpdir) {
if err := os.RemoveAll(tmpdir); err != nil {
if fileutil.Exist(dir) {
if err = os.RemoveAll(dir); err != nil {
return err
}
}
if err := fileutil.CreateDirAll(tmpdir); err != nil {
if err = fileutil.CreateDirAll(dir); err != nil {
return err
}
chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
chunkf, err := fileutil.LockFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return errors.Wrap(err, "create chunk file")
}
indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
indexf, err := fileutil.LockFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return errors.Wrap(err, "create index file")
}
@ -199,47 +133,29 @@ func (c *compactor) compact(blocks ...block) error {
indexw := newIndexWriter(indexf)
chunkw := newSeriesWriter(chunkf, indexw)
if err := c.write(blocks, indexw, chunkw); err != nil {
if err = c.write(blocks, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}
if err := chunkw.Close(); err != nil {
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
if err := indexw.Close(); err != nil {
if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer")
}
if err := fileutil.Fsync(chunkf.File); err != nil {
if err = fileutil.Fsync(chunkf.File); err != nil {
return errors.Wrap(err, "fsync chunk file")
}
if err := fileutil.Fsync(indexf.File); err != nil {
if err = fileutil.Fsync(indexf.File); err != nil {
return errors.Wrap(err, "fsync index file")
}
if err := chunkf.Close(); err != nil {
if err = chunkf.Close(); err != nil {
return errors.Wrap(err, "close chunk file")
}
if err := indexf.Close(); err != nil {
if err = indexf.Close(); err != nil {
return errors.Wrap(err, "close index file")
}
c.blocks.lock().Lock()
defer c.blocks.lock().Unlock()
if err := renameDir(tmpdir, blocks[0].dir()); err != nil {
return errors.Wrap(err, "rename dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.dir()); err != nil {
return errors.Wrap(err, "delete dir")
}
}
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(c.blocks.reinit(b.dir()), "reinit block at %q", b.dir()))
}
return merr.Err()
return nil
}
func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error {

103
db.go
View File

@ -73,12 +73,17 @@ type DB struct {
persisted []*persistedBlock
heads []*HeadBlock
compactor *compactor
compactc chan struct{}
donec chan struct{}
stopc chan struct{}
}
type dbMetrics struct {
persistences prometheus.Counter
persistenceDuration prometheus.Histogram
samplesAppended prometheus.Counter
compactionsTriggered prometheus.Counter
}
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
@ -97,6 +102,10 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
if r != nil {
r.MustRegister(
@ -109,7 +118,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
}
// Open returns a new DB in the given directory.
func Open(dir string, logger log.Logger) (p *DB, err error) {
func Open(dir string, logger log.Logger) (db *DB, err error) {
// Create directory if partition is new.
if !fileutil.Exist(dir) {
if err := os.MkdirAll(dir, 0777); err != nil {
@ -117,19 +126,90 @@ func Open(dir string, logger log.Logger) (p *DB, err error) {
}
}
p = &DB{
db = &DB{
dir: dir,
logger: logger,
metrics: newDBMetrics(nil),
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
if err := p.initBlocks(); err != nil {
if err := db.initBlocks(); err != nil {
return nil, err
}
if p.compactor, err = newCompactor(p, logger); err != nil {
if db.compactor, err = newCompactor(db); err != nil {
return nil, err
}
return p, nil
go db.run()
return db, nil
}
func (db *DB) run() {
defer close(db.donec)
for {
select {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
for {
blocks := db.compactor.pick()
if len(blocks) == 0 {
break
}
// TODO(fabxc): pick emits blocks in order. compact acts on
// inverted order. Put inversion into compactor?
var bs []block
for _, b := range blocks {
bs = append([]block{b}, bs...)
}
select {
case <-db.stopc:
return
default:
}
if err := db.compact(bs); err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
}
case <-db.stopc:
return
}
}
}
func (db *DB) compact(blocks []block) error {
if len(blocks) == 0 {
return nil
}
tmpdir := blocks[0].dir() + ".tmp"
if err := db.compactor.compact(tmpdir, blocks...); err != nil {
return err
}
db.mtx.Lock()
defer db.mtx.Unlock()
if err := renameDir(tmpdir, blocks[0].dir()); err != nil {
return errors.Wrap(err, "rename dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.dir()); err != nil {
return errors.Wrap(err, "delete dir")
}
}
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(db.reinit(b.dir()), "reinit block at %q", b.dir()))
}
return merr.Err()
}
func isBlockDir(fi os.FileInfo) bool {
@ -202,8 +282,10 @@ func (db *DB) initBlocks() error {
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
<-db.donec
var merr MultiError
merr.Add(db.compactor.Close())
db.mtx.Lock()
defer db.mtx.Unlock()
@ -240,17 +322,16 @@ func (db *DB) appendBatch(samples []hashedSample) error {
if err := db.cut(); err != nil {
db.logger.Log("msg", "cut failed", "err", err)
} else {
db.compactor.trigger()
select {
case db.compactc <- struct{}{}:
default:
}
}
}
return err
}
func (db *DB) lock() sync.Locker {
return &db.mtx
}
func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads {
if b.dir() == dir {

View File

@ -50,6 +50,9 @@ func OpenHeadBlock(dir string) (*HeadBlock, error) {
wal: wal,
}
b.bstats.MinTime = math.MaxInt64
b.bstats.MaxTime = math.MinInt64
err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) {
b.create(lset.Hash(), lset)

View File

@ -35,101 +35,10 @@ type Series interface {
Iterator() SeriesIterator
}
// querier merges query results from a set of partition querieres.
type querier struct {
mint, maxt int64
partitions []Querier
}
// Querier returns a new querier over the database for the given
// time range.
func (db *PartitionedDB) Querier(mint, maxt int64) Querier {
q := &querier{
mint: mint,
maxt: maxt,
}
for _, s := range db.Partitions {
q.partitions = append(q.partitions, s.Querier(mint, maxt))
}
return q
}
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every partition and simply
// return their union.
r := &mergedSeriesSet{}
for _, s := range q.partitions {
r.sets = append(r.sets, s.Select(ms...))
}
if len(r.sets) == 0 {
return nopSeriesSet{}
}
return r
}
func (q *querier) LabelValues(n string) ([]string, error) {
res, err := q.partitions[0].LabelValues(n)
if err != nil {
return nil, err
}
for _, sq := range q.partitions[1:] {
pr, err := sq.LabelValues(n)
if err != nil {
return nil, err
}
// Merge new values into deduplicated result.
res = mergeStrings(res, pr)
}
return res, nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *querier) Close() error {
var merr MultiError
for _, sq := range q.partitions {
merr.Add(sq.Close())
}
return merr.Err()
}
// partitionQuerier aggregates querying results from time blocks within
// querier aggregates querying results from time blocks within
// a single partition.
type partitionQuerier struct {
partition *DB
type querier struct {
db *DB
blocks []Querier
}
@ -140,9 +49,9 @@ func (s *DB) Querier(mint, maxt int64) Querier {
blocks := s.blocksForInterval(mint, maxt)
sq := &partitionQuerier{
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
partition: s,
db: s,
}
for _, b := range blocks {
@ -163,7 +72,7 @@ func (s *DB) Querier(mint, maxt int64) Querier {
return sq
}
func (q *partitionQuerier) LabelValues(n string) ([]string, error) {
func (q *querier) LabelValues(n string) ([]string, error) {
res, err := q.blocks[0].LabelValues(n)
if err != nil {
return nil, err
@ -179,11 +88,11 @@ func (q *partitionQuerier) LabelValues(n string) ([]string, error) {
return res, nil
}
func (q *partitionQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet {
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
@ -199,13 +108,13 @@ func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet {
return r
}
func (q *partitionQuerier) Close() error {
func (q *querier) Close() error {
var merr MultiError
for _, bq := range q.blocks {
merr.Add(bq.Close())
}
q.partition.mtx.RUnlock()
q.db.mtx.RUnlock()
return merr.Err()
}
@ -321,6 +230,97 @@ func (q *blockQuerier) Close() error {
return nil
}
// partitionedQuerier merges query results from a set of partition querieres.
type partitionedQuerier struct {
mint, maxt int64
partitions []Querier
}
// Querier returns a new querier over the database for the given
// time range.
func (db *PartitionedDB) Querier(mint, maxt int64) Querier {
q := &partitionedQuerier{
mint: mint,
maxt: maxt,
}
for _, s := range db.Partitions {
q.partitions = append(q.partitions, s.Querier(mint, maxt))
}
return q
}
func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every partition and simply
// return their union.
r := &mergedSeriesSet{}
for _, s := range q.partitions {
r.sets = append(r.sets, s.Select(ms...))
}
if len(r.sets) == 0 {
return nopSeriesSet{}
}
return r
}
func (q *partitionedQuerier) LabelValues(n string) ([]string, error) {
res, err := q.partitions[0].LabelValues(n)
if err != nil {
return nil, err
}
for _, sq := range q.partitions[1:] {
pr, err := sq.LabelValues(n)
if err != nil {
return nil, err
}
// Merge new values into deduplicated result.
res = mergeStrings(res, pr)
}
return res, nil
}
func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *partitionedQuerier) Close() error {
var merr MultiError
for _, sq := range q.partitions {
merr.Add(sq.Close())
}
return merr.Err()
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool