From a1c842535771168d89808f2f367b71128fb25ec5 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 5 Jun 2017 13:48:31 +0530 Subject: [PATCH 1/4] Initial implementation of HeadBlock Snapshots Signed-off-by: Goutham Veeramachaneni --- block.go | 6 +++++ compact.go | 6 ++--- db.go | 28 +++++++++++++++++++++++ head.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 3 deletions(-) diff --git a/block.go b/block.go index 1351efae7..0eb2b7c92 100644 --- a/block.go +++ b/block.go @@ -58,6 +58,12 @@ type Block interface { type headBlock interface { Block Appendable + Snapshottable +} + +// Snapshottable defines an entity that can be backedup online. +type Snapshottable interface { + Snapshot(dir string) error } // Appendable defines an entity to which data can be appended. diff --git a/compact.go b/compact.go index ee54bc9ec..c236bf7a1 100644 --- a/compact.go +++ b/compact.go @@ -246,7 +246,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return errors.Wrap(err, "open index writer") } - meta, err := c.populate(blocks, indexw, chunkw) + meta, err := populateBlock(blocks, indexw, chunkw) if err != nil { return errors.Wrap(err, "write compaction") } @@ -289,9 +289,9 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return nil } -// populate fills the index and chunk writers with new data gathered as the union +// populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { diff --git a/db.go b/db.go index d50ff842c..a7aea28e8 100644 --- a/db.go +++ b/db.go @@ -528,6 +528,34 @@ func (db *DB) Close() error { return merr.Err() } +// DisableCompactions disables compactions. +func (db *DB) DisableCompactions() error { + db.stopc <- struct{}{} // TODO: Can this block? + db.cmtx.Lock() + return nil +} + +// EnableCompactions enables compactions. +func (db *DB) EnableCompactions() error { + db.cmtx.Unlock() + return nil +} + +// Snapshot writes the current headBlock snapshots to snapshots directory. +func (db *DB) Snapshot(dir string) error { + db.headmtx.RLock() + heads := db.heads[:] + db.headmtx.RUnlock() + + for _, h := range heads { + if err := h.Snapshot(dir); err != nil { + return errors.Wrap(err, "error snapshotting headblock") + } + } + + return nil +} + // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.metrics.activeAppenders.Inc() diff --git a/head.go b/head.go index 7f93d1658..426f1d96a 100644 --- a/head.go +++ b/head.go @@ -262,6 +262,71 @@ Outer: return nil } +// Snapshot persists the current state of the headblock to the given directory. +func (h *HeadBlock) Snapshot(snapshotDir string) error { + // Needed to stop any appenders. + h.mtx.Lock() + defer h.mtx.Unlock() + + if h.meta.Stats.NumSeries == 0 { + return nil + } + + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + dir := filepath.Join(snapshotDir, uid.String()) + tmp := dir + ".tmp" + + if err := os.RemoveAll(tmp); err != nil { + return err + } + + if err := os.MkdirAll(tmp, 0777); err != nil { + return err + } + + // Populate chunk and index files into temporary directory with + // data of all blocks. + chunkw, err := newChunkWriter(chunkDir(tmp)) + if err != nil { + return errors.Wrap(err, "open chunk writer") + } + indexw, err := newIndexWriter(tmp) + if err != nil { + return errors.Wrap(err, "open index writer") + } + + meta, err := populateBlock([]Block{h}, indexw, chunkw) + if err != nil { + return errors.Wrap(err, "write snapshot") + } + meta.ULID = uid + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + + if err = chunkw.Close(); err != nil { + return errors.Wrap(err, "close chunk writer") + } + if err = indexw.Close(); err != nil { + return errors.Wrap(err, "close index writer") + } + + // Create an empty tombstones file. + if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + return errors.Wrap(err, "write new tombstones file") + } + + // Block successfully written, make visible + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + + return nil +} + // Dir returns the directory of the block. func (h *HeadBlock) Dir() string { return h.dir } From a110a64abdcd21314dcf3829b09eee8b74379fbe Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 6 Jun 2017 18:15:54 +0530 Subject: [PATCH 2/4] Add full Snapshot support Signed-off-by: Goutham Veeramachaneni --- block.go | 39 ++++++++++++++++++++++++++++++++++++++- db.go | 48 +++++++++++++++++++++++++++--------------------- head.go | 7 +++---- index.go | 4 +++- 4 files changed, 71 insertions(+), 27 deletions(-) diff --git a/block.go b/block.go index 0eb2b7c92..792a96e9f 100644 --- a/block.go +++ b/block.go @@ -1,4 +1,5 @@ // Copyright 2017 The Prometheus Authors + // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -52,13 +53,13 @@ type DiskBlock interface { type Block interface { DiskBlock Queryable + Snapshottable } // headBlock is a regular block that can still be appended to. type headBlock interface { Block Appendable - Snapshottable } // Snapshottable defines an entity that can be backedup online. @@ -278,6 +279,42 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } +func (pb *persistedBlock) Snapshot(dir string) error { + blockDir := filepath.Join(dir, pb.meta.ULID.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot block dir") + } + + chunksDir := chunkDir(blockDir) + if err := os.MkdirAll(chunksDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot chunk dir") + } + + // Hardlink meta, index and tombstones + filenames := []string{metaFilename, indexFilename, tombstoneFilename} + for _, fname := range filenames { + if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { + return errors.Wrapf(err, "create snapshot %s", fname) + } + } + + // Hardlink the chunks + curChunkDir := chunkDir(pb.dir) + files, err := ioutil.ReadDir(curChunkDir) + if err != nil { + return errors.Wrap(err, "ReadDir the current chunk dir") + } + + for _, f := range files { + err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name())) + if err != nil { + return errors.Wrap(err, "hardlink a chunk") + } + } + + return nil +} + func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } diff --git a/db.go b/db.go index a7aea28e8..f7a646372 100644 --- a/db.go +++ b/db.go @@ -121,7 +121,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex + cmtx sync.Mutex + compacting bool } type dbMetrics struct { @@ -200,12 +201,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compacting: true, } db.metrics = newDBMetrics(db, r) @@ -528,27 +530,31 @@ func (db *DB) Close() error { return merr.Err() } -// DisableCompactions disables compactions. -func (db *DB) DisableCompactions() error { - db.stopc <- struct{}{} // TODO: Can this block? - db.cmtx.Lock() - return nil -} +// ToggleCompactions toggles compactions and returns if compactions are on or not. +func (db *DB) ToggleCompactions() bool { + if db.compacting { + db.cmtx.Lock() + db.compacting = false + return false + } -// EnableCompactions enables compactions. -func (db *DB) EnableCompactions() error { db.cmtx.Unlock() - return nil + db.compacting = true + return true } // Snapshot writes the current headBlock snapshots to snapshots directory. func (db *DB) Snapshot(dir string) error { - db.headmtx.RLock() - heads := db.heads[:] - db.headmtx.RUnlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() - for _, h := range heads { - if err := h.Snapshot(dir); err != nil { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + blocks := db.blocks[:] + for _, b := range blocks { + db.logger.Log("msg", "compacting block", "block", b.Dir()) + if err := b.Snapshot(dir); err != nil { return errors.Wrap(err, "error snapshotting headblock") } } diff --git a/head.go b/head.go index 426f1d96a..8bf0762bb 100644 --- a/head.go +++ b/head.go @@ -263,11 +263,10 @@ Outer: } // Snapshot persists the current state of the headblock to the given directory. +// TODO(gouthamve): Snapshot must be called when there are no active appenders. +// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should +// be removed in the future. func (h *HeadBlock) Snapshot(snapshotDir string) error { - // Needed to stop any appenders. - h.mtx.Lock() - defer h.mtx.Unlock() - if h.meta.Stats.NumSeries == 0 { return nil } diff --git a/index.go b/index.go index b36eb49bd..3264d9263 100644 --- a/index.go +++ b/index.go @@ -39,6 +39,8 @@ const ( indexFormatV1 = 1 ) +const indexFilename = "index" + const compactionPageBytes = minSectorSize * 64 type indexWriterSeries struct { @@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err != nil { return nil, err } - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) + f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err } From 261cd9f39355888fe0811b09624a397ddc0f3efd Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 6 Jun 2017 20:23:20 +0530 Subject: [PATCH 3/4] Incorporate feedback. Move back to {Enable, Disable}Compactions. Signed-off-by: Goutham Veeramachaneni --- block.go | 7 +++++-- db.go | 25 +++++++++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/block.go b/block.go index 792a96e9f..d25b5b4bf 100644 --- a/block.go +++ b/block.go @@ -291,8 +291,11 @@ func (pb *persistedBlock) Snapshot(dir string) error { } // Hardlink meta, index and tombstones - filenames := []string{metaFilename, indexFilename, tombstoneFilename} - for _, fname := range filenames { + for _, fname := range []string{ + metaFilename, + indexFilename, + tombstoneFilename, + } { if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { return errors.Wrapf(err, "create snapshot %s", fname) } diff --git a/db.go b/db.go index f7a646372..462a3060e 100644 --- a/db.go +++ b/db.go @@ -530,20 +530,29 @@ func (db *DB) Close() error { return merr.Err() } -// ToggleCompactions toggles compactions and returns if compactions are on or not. -func (db *DB) ToggleCompactions() bool { +// DisableCompactions disables compactions. +func (db *DB) DisableCompactions() error { if db.compacting { db.cmtx.Lock() db.compacting = false - return false + db.logger.Log("msg", "compactions disabled") } - db.cmtx.Unlock() - db.compacting = true - return true + return nil +} + +// EnableCompactions enables compactions. +func (db *DB) EnableCompactions() error { + if !db.compacting { + db.cmtx.Unlock() + db.compacting = true + db.logger.Log("msg", "compactions enabled") + } + + return nil } -// Snapshot writes the current headBlock snapshots to snapshots directory. +// Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { db.mtx.Lock() // To block any appenders. defer db.mtx.Unlock() @@ -553,7 +562,7 @@ func (db *DB) Snapshot(dir string) error { blocks := db.blocks[:] for _, b := range blocks { - db.logger.Log("msg", "compacting block", "block", b.Dir()) + db.logger.Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { return errors.Wrap(err, "error snapshotting headblock") } From ff4ccb6eb0be9f37c795978ba7d416fb2ccdfdbd Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 6 Jun 2017 23:45:23 +0530 Subject: [PATCH 4/4] Remove unnecessary error from ToggleCompaction fns Signed-off-by: Goutham Veeramachaneni --- db.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/db.go b/db.go index 462a3060e..015e29d17 100644 --- a/db.go +++ b/db.go @@ -531,25 +531,21 @@ func (db *DB) Close() error { } // DisableCompactions disables compactions. -func (db *DB) DisableCompactions() error { +func (db *DB) DisableCompactions() { if db.compacting { db.cmtx.Lock() db.compacting = false db.logger.Log("msg", "compactions disabled") } - - return nil } // EnableCompactions enables compactions. -func (db *DB) EnableCompactions() error { +func (db *DB) EnableCompactions() { if !db.compacting { db.cmtx.Unlock() db.compacting = true db.logger.Log("msg", "compactions enabled") } - - return nil } // Snapshot writes the current data to the directory.