Add full Snapshot support

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
pull/5805/head
Goutham Veeramachaneni 8 years ago
parent a1c8425357
commit a110a64abd
No known key found for this signature in database
GPG Key ID: F1C217E8E9023CAD

@ -1,4 +1,5 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -52,13 +53,13 @@ type DiskBlock interface {
type Block interface {
DiskBlock
Queryable
Snapshottable
}
// headBlock is a regular block that can still be appended to.
type headBlock interface {
Block
Appendable
Snapshottable
}
// Snapshottable defines an entity that can be backedup online.
@ -278,6 +279,42 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta)
}
func (pb *persistedBlock) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return errors.Wrap(err, "create snapshot block dir")
}
chunksDir := chunkDir(blockDir)
if err := os.MkdirAll(chunksDir, 0777); err != nil {
return errors.Wrap(err, "create snapshot chunk dir")
}
// Hardlink meta, index and tombstones
filenames := []string{metaFilename, indexFilename, tombstoneFilename}
for _, fname := range filenames {
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
return errors.Wrapf(err, "create snapshot %s", fname)
}
}
// Hardlink the chunks
curChunkDir := chunkDir(pb.dir)
files, err := ioutil.ReadDir(curChunkDir)
if err != nil {
return errors.Wrap(err, "ReadDir the current chunk dir")
}
for _, f := range files {
err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
if err != nil {
return errors.Wrap(err, "hardlink a chunk")
}
}
return nil
}
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func walDir(dir string) string { return filepath.Join(dir, "wal") }

48
db.go

@ -121,7 +121,8 @@ type DB struct {
stopc chan struct{}
// cmtx is used to control compactions and deletions.
cmtx sync.Mutex
cmtx sync.Mutex
compacting bool
}
type dbMetrics struct {
@ -200,12 +201,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
}
db = &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
compacting: true,
}
db.metrics = newDBMetrics(db, r)
@ -528,27 +530,31 @@ func (db *DB) Close() error {
return merr.Err()
}
// DisableCompactions disables compactions.
func (db *DB) DisableCompactions() error {
db.stopc <- struct{}{} // TODO: Can this block?
db.cmtx.Lock()
return nil
}
// ToggleCompactions toggles compactions and returns if compactions are on or not.
func (db *DB) ToggleCompactions() bool {
if db.compacting {
db.cmtx.Lock()
db.compacting = false
return false
}
// EnableCompactions enables compactions.
func (db *DB) EnableCompactions() error {
db.cmtx.Unlock()
return nil
db.compacting = true
return true
}
// Snapshot writes the current headBlock snapshots to snapshots directory.
func (db *DB) Snapshot(dir string) error {
db.headmtx.RLock()
heads := db.heads[:]
db.headmtx.RUnlock()
db.mtx.Lock() // To block any appenders.
defer db.mtx.Unlock()
for _, h := range heads {
if err := h.Snapshot(dir); err != nil {
db.cmtx.Lock()
defer db.cmtx.Unlock()
blocks := db.blocks[:]
for _, b := range blocks {
db.logger.Log("msg", "compacting block", "block", b.Dir())
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}

@ -263,11 +263,10 @@ Outer:
}
// Snapshot persists the current state of the headblock to the given directory.
// TODO(gouthamve): Snapshot must be called when there are no active appenders.
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
// be removed in the future.
func (h *HeadBlock) Snapshot(snapshotDir string) error {
// Needed to stop any appenders.
h.mtx.Lock()
defer h.mtx.Unlock()
if h.meta.Stats.NumSeries == 0 {
return nil
}

@ -39,6 +39,8 @@ const (
indexFormatV1 = 1
)
const indexFilename = "index"
const compactionPageBytes = minSectorSize * 64
type indexWriterSeries struct {
@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
if err != nil {
return nil, err
}
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, err
}

Loading…
Cancel
Save