diff --git a/block.go b/block.go index 1351efae7..d25b5b4bf 100644 --- a/block.go +++ b/block.go @@ -1,4 +1,5 @@ // Copyright 2017 The Prometheus Authors + // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -52,6 +53,7 @@ type DiskBlock interface { type Block interface { DiskBlock Queryable + Snapshottable } // headBlock is a regular block that can still be appended to. @@ -60,6 +62,11 @@ type headBlock interface { Appendable } +// Snapshottable defines an entity that can be backedup online. +type Snapshottable interface { + Snapshot(dir string) error +} + // Appendable defines an entity to which data can be appended. type Appendable interface { // Appender returns a new Appender against an underlying store. @@ -272,6 +279,45 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } +func (pb *persistedBlock) Snapshot(dir string) error { + blockDir := filepath.Join(dir, pb.meta.ULID.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot block dir") + } + + chunksDir := chunkDir(blockDir) + if err := os.MkdirAll(chunksDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot chunk dir") + } + + // Hardlink meta, index and tombstones + for _, fname := range []string{ + metaFilename, + indexFilename, + tombstoneFilename, + } { + if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { + return errors.Wrapf(err, "create snapshot %s", fname) + } + } + + // Hardlink the chunks + curChunkDir := chunkDir(pb.dir) + files, err := ioutil.ReadDir(curChunkDir) + if err != nil { + return errors.Wrap(err, "ReadDir the current chunk dir") + } + + for _, f := range files { + err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name())) + if err != nil { + return errors.Wrap(err, "hardlink a chunk") + } + } + + return nil +} + func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } diff --git a/compact.go b/compact.go index ee54bc9ec..c236bf7a1 100644 --- a/compact.go +++ b/compact.go @@ -246,7 +246,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return errors.Wrap(err, "open index writer") } - meta, err := c.populate(blocks, indexw, chunkw) + meta, err := populateBlock(blocks, indexw, chunkw) if err != nil { return errors.Wrap(err, "write compaction") } @@ -289,9 +289,9 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return nil } -// populate fills the index and chunk writers with new data gathered as the union +// populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { diff --git a/db.go b/db.go index d50ff842c..015e29d17 100644 --- a/db.go +++ b/db.go @@ -121,7 +121,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex + cmtx sync.Mutex + compacting bool } type dbMetrics struct { @@ -200,12 +201,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compacting: true, } db.metrics = newDBMetrics(db, r) @@ -528,6 +530,43 @@ func (db *DB) Close() error { return merr.Err() } +// DisableCompactions disables compactions. +func (db *DB) DisableCompactions() { + if db.compacting { + db.cmtx.Lock() + db.compacting = false + db.logger.Log("msg", "compactions disabled") + } +} + +// EnableCompactions enables compactions. +func (db *DB) EnableCompactions() { + if !db.compacting { + db.cmtx.Unlock() + db.compacting = true + db.logger.Log("msg", "compactions enabled") + } +} + +// Snapshot writes the current data to the directory. +func (db *DB) Snapshot(dir string) error { + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() + + db.cmtx.Lock() + defer db.cmtx.Unlock() + + blocks := db.blocks[:] + for _, b := range blocks { + db.logger.Log("msg", "snapshotting block", "block", b) + if err := b.Snapshot(dir); err != nil { + return errors.Wrap(err, "error snapshotting headblock") + } + } + + return nil +} + // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.metrics.activeAppenders.Inc() diff --git a/head.go b/head.go index 7f93d1658..8bf0762bb 100644 --- a/head.go +++ b/head.go @@ -262,6 +262,70 @@ Outer: return nil } +// Snapshot persists the current state of the headblock to the given directory. +// TODO(gouthamve): Snapshot must be called when there are no active appenders. +// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should +// be removed in the future. +func (h *HeadBlock) Snapshot(snapshotDir string) error { + if h.meta.Stats.NumSeries == 0 { + return nil + } + + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + dir := filepath.Join(snapshotDir, uid.String()) + tmp := dir + ".tmp" + + if err := os.RemoveAll(tmp); err != nil { + return err + } + + if err := os.MkdirAll(tmp, 0777); err != nil { + return err + } + + // Populate chunk and index files into temporary directory with + // data of all blocks. + chunkw, err := newChunkWriter(chunkDir(tmp)) + if err != nil { + return errors.Wrap(err, "open chunk writer") + } + indexw, err := newIndexWriter(tmp) + if err != nil { + return errors.Wrap(err, "open index writer") + } + + meta, err := populateBlock([]Block{h}, indexw, chunkw) + if err != nil { + return errors.Wrap(err, "write snapshot") + } + meta.ULID = uid + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + + if err = chunkw.Close(); err != nil { + return errors.Wrap(err, "close chunk writer") + } + if err = indexw.Close(); err != nil { + return errors.Wrap(err, "close index writer") + } + + // Create an empty tombstones file. + if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + return errors.Wrap(err, "write new tombstones file") + } + + // Block successfully written, make visible + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + + return nil +} + // Dir returns the directory of the block. func (h *HeadBlock) Dir() string { return h.dir } diff --git a/index.go b/index.go index b36eb49bd..3264d9263 100644 --- a/index.go +++ b/index.go @@ -39,6 +39,8 @@ const ( indexFormatV1 = 1 ) +const indexFilename = "index" + const compactionPageBytes = minSectorSize * 64 type indexWriterSeries struct { @@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err != nil { return nil, err } - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) + f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err }