Open db in Read only mode (#588)

* Added db read only open mode and use it for the tsdb cli.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 5 years ago committed by GitHub
parent 3df36b4fe0
commit 6f9bbc7253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,9 @@
## Master / unreleased
## master / unreleased
- [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode.
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s.
- `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`.
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.

@ -138,11 +138,8 @@ type BlockReader interface {
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)
// MinTime returns the min time of the block.
MinTime() int64
// MaxTime returns the max time of the block.
MaxTime() int64
// Meta provides meta information about the block reader.
Meta() BlockMeta
}
// Appendable defines an entity to which data can be appended.

@ -175,8 +175,7 @@ func TestBlockSize(t *testing.T) {
testutil.Ok(t, blockInit.Close())
}()
expSizeInit = blockInit.Size()
actSizeInit, err := testutil.DirSize(blockInit.Dir())
testutil.Ok(t, err)
actSizeInit := testutil.DirSize(t, blockInit.Dir())
testutil.Equals(t, expSizeInit, actSizeInit)
}
@ -185,7 +184,7 @@ func TestBlockSize(t *testing.T) {
testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*")))
expAfterDelete := blockInit.Size()
testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
actAfterDelete, err := testutil.DirSize(blockDirInit)
actAfterDelete := testutil.DirSize(t, blockDirInit)
testutil.Ok(t, err)
testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
@ -199,8 +198,7 @@ func TestBlockSize(t *testing.T) {
testutil.Ok(t, blockAfterCompact.Close())
}()
expAfterCompact := blockAfterCompact.Size()
actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir())
testutil.Ok(t, err)
actAfterCompact := testutil.DirSize(t, blockAfterCompact.Dir())
testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact)
testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size")
}

@ -34,11 +34,19 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
)
func main() {
if err := execute(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func execute() (err error) {
var (
defaultDBPath = filepath.Join("benchout", "storage")
@ -61,8 +69,8 @@ func main() {
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
)
safeDBOptions := *tsdb.DefaultOptions
safeDBOptions.RetentionDuration = 0
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
var merr tsdb_errors.MultiError
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
case benchWriteCmd.FullCommand():
@ -70,21 +78,39 @@ func main() {
outPath: *benchWriteOutPath,
numMetrics: *benchWriteNumMetrics,
samplesFile: *benchSamplesFile,
logger: logger,
}
wb.run()
return wb.run()
case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenDBReadOnly(*listPath, nil)
if err != nil {
exitWithError(err)
return err
}
printBlocks(db.Blocks(), listCmdHumanReadable)
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
blocks, err := db.Blocks()
if err != nil {
return err
}
printBlocks(blocks, listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.Open(*analyzePath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenDBReadOnly(*analyzePath, nil)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
blocks, err := db.Blocks()
if err != nil {
exitWithError(err)
return err
}
blocks := db.Blocks()
var block *tsdb.Block
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
@ -96,16 +122,22 @@ func main() {
block = blocks[len(blocks)-1]
}
if block == nil {
exitWithError(fmt.Errorf("block not found"))
return fmt.Errorf("block not found")
}
analyzeBlock(block, *analyzeLimit)
return analyzeBlock(block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.Open(*dumpPath, nil, nil, &safeDBOptions)
db, err := tsdb.OpenDBReadOnly(*dumpPath, nil)
if err != nil {
exitWithError(err)
return err
}
dumpSamples(db, *dumpMinTime, *dumpMaxTime)
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
}
return nil
}
type writeBenchmark struct {
@ -120,74 +152,87 @@ type writeBenchmark struct {
memprof *os.File
blockprof *os.File
mtxprof *os.File
logger log.Logger
}
func (b *writeBenchmark) run() {
func (b *writeBenchmark) run() error {
if b.outPath == "" {
dir, err := ioutil.TempDir("", "tsdb_bench")
if err != nil {
exitWithError(err)
return err
}
b.outPath = dir
b.cleanup = true
}
if err := os.RemoveAll(b.outPath); err != nil {
exitWithError(err)
return err
}
if err := os.MkdirAll(b.outPath, 0777); err != nil {
exitWithError(err)
return err
}
dir := filepath.Join(b.outPath, "storage")
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),
})
if err != nil {
exitWithError(err)
return err
}
b.storage = st
var labels []labels.Labels
measureTime("readData", func() {
_, err = measureTime("readData", func() error {
f, err := os.Open(b.samplesFile)
if err != nil {
exitWithError(err)
return err
}
defer f.Close()
labels, err = readPrometheusLabels(f, b.numMetrics)
if err != nil {
exitWithError(err)
return err
}
return nil
})
if err != nil {
return err
}
var total uint64
dur := measureTime("ingestScrapes", func() {
dur, err := measureTime("ingestScrapes", func() error {
b.startProfiling()
total, err = b.ingestScrapes(labels, 3000)
if err != nil {
exitWithError(err)
return err
}
return nil
})
if err != nil {
return err
}
fmt.Println(" > total samples:", total)
fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
measureTime("stopStorage", func() {
_, err = measureTime("stopStorage", func() error {
if err := b.storage.Close(); err != nil {
exitWithError(err)
return err
}
if err := b.stopProfiling(); err != nil {
exitWithError(err)
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
const timeDelta = 30000
@ -281,37 +326,38 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
return total, nil
}
func (b *writeBenchmark) startProfiling() {
func (b *writeBenchmark) startProfiling() error {
var err error
// Start CPU profiling.
b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err))
return fmt.Errorf("bench: could not create cpu profile: %v", err)
}
if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err))
return fmt.Errorf("bench: could not start CPU profile: %v", err)
}
// Start memory profiling.
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
return fmt.Errorf("bench: could not create memory profile: %v", err)
}
runtime.MemProfileRate = 64 * 1024
// Start fatal profiling.
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
return fmt.Errorf("bench: could not create block profile: %v", err)
}
runtime.SetBlockProfileRate(20)
b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err))
return fmt.Errorf("bench: could not create mutex profile: %v", err)
}
runtime.SetMutexProfileFraction(20)
return nil
}
func (b *writeBenchmark) stopProfiling() error {
@ -346,12 +392,15 @@ func (b *writeBenchmark) stopProfiling() error {
return nil
}
func measureTime(stage string, f func()) time.Duration {
func measureTime(stage string, f func() error) (time.Duration, error) {
fmt.Printf(">> start stage=%s\n", stage)
start := time.Now()
f()
err := f()
if err != nil {
return 0, err
}
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
return time.Since(start)
return time.Since(start), nil
}
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
@ -385,12 +434,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
return mets, nil
}
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
func printBlocks(blocks []*tsdb.Block, humanReadable *bool) {
func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()
@ -417,21 +461,21 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string {
return strconv.FormatInt(timestamp, 10)
}
func analyzeBlock(b *tsdb.Block, limit int) {
fmt.Printf("Block path: %s\n", b.Dir())
func analyzeBlock(b tsdb.BlockReader, limit int) error {
meta := b.Meta()
fmt.Printf("Block ID: %s\n", meta.ULID)
// Presume 1ms resolution that Prometheus uses.
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
ir, err := b.Index()
if err != nil {
exitWithError(err)
return err
}
defer ir.Close()
allLabelNames, err := ir.LabelNames()
if err != nil {
exitWithError(err)
return err
}
fmt.Printf("Label names: %d\n", len(allLabelNames))
@ -458,13 +502,13 @@ func analyzeBlock(b *tsdb.Block, limit int) {
entries := 0
p, err := ir.Postings("", "") // The special all key.
if err != nil {
exitWithError(err)
return err
}
lbls := labels.Labels{}
chks := []chunks.Meta{}
for p.Next() {
if err = ir.Series(p.At(), &lbls, &chks); err != nil {
exitWithError(err)
return err
}
// Amount of the block time range not covered by this series.
uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime)
@ -477,7 +521,7 @@ func analyzeBlock(b *tsdb.Block, limit int) {
}
}
if p.Err() != nil {
exitWithError(p.Err())
return p.Err()
}
fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered))
fmt.Printf("Postings entries (total label pairs): %d\n", entries)
@ -510,14 +554,14 @@ func analyzeBlock(b *tsdb.Block, limit int) {
for _, n := range allLabelNames {
values, err := ir.LabelValues(n)
if err != nil {
exitWithError(err)
return err
}
var cumulativeLength uint64
for i := 0; i < values.Len(); i++ {
value, _ := values.At(i)
if err != nil {
exitWithError(err)
return err
}
for _, str := range value {
cumulativeLength += uint64(len(str))
@ -534,7 +578,7 @@ func analyzeBlock(b *tsdb.Block, limit int) {
for _, n := range allLabelNames {
lv, err := ir.LabelValues(n)
if err != nil {
exitWithError(err)
return err
}
postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())})
}
@ -544,41 +588,49 @@ func analyzeBlock(b *tsdb.Block, limit int) {
postingInfos = postingInfos[:0]
lv, err := ir.LabelValues("__name__")
if err != nil {
exitWithError(err)
return err
}
for i := 0; i < lv.Len(); i++ {
names, err := lv.At(i)
if err != nil {
exitWithError(err)
return err
}
for _, n := range names {
postings, err := ir.Postings("__name__", n)
if err != nil {
exitWithError(err)
return err
}
count := 0
for postings.Next() {
count++
}
if postings.Err() != nil {
exitWithError(postings.Err())
return postings.Err()
}
postingInfos = append(postingInfos, postingInfo{n, uint64(count)})
}
}
fmt.Printf("\nHighest cardinality metric names:\n")
printInfo(postingInfos)
return nil
}
func dumpSamples(db *tsdb.DB, mint, maxt int64) {
func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
q, err := db.Querier(mint, maxt)
if err != nil {
exitWithError(err)
return err
}
defer func() {
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(q.Close())
err = merr.Err()
}()
ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*"))
if err != nil {
exitWithError(err)
return err
}
for ss.Next() {
@ -590,11 +642,12 @@ func dumpSamples(db *tsdb.DB, mint, maxt int64) {
fmt.Printf("%s %g %d\n", labels, val, ts)
}
if it.Err() != nil {
exitWithError(ss.Err())
return ss.Err()
}
}
if ss.Err() != nil {
exitWithError(ss.Err())
return ss.Err()
}
return nil
}

@ -662,7 +662,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}()
c.metrics.populatingBlocks.Set(1)
globalMaxt := blocks[0].MaxTime()
globalMaxt := blocks[0].Meta().MaxTime
for i, b := range blocks {
select {
case <-c.ctx.Done():
@ -671,13 +671,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
if !overlapping {
if i > 0 && b.MinTime() < globalMaxt {
if i > 0 && b.Meta().MinTime < globalMaxt {
c.metrics.overlappingBlocks.Inc()
overlapping = true
level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.MaxTime() > globalMaxt {
globalMaxt = b.MaxTime()
if b.Meta().MaxTime > globalMaxt {
globalMaxt = b.Meta().MaxTime
}
}

@ -458,8 +458,7 @@ type erringBReader struct{}
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
func (erringBReader) MinTime() int64 { return 0 }
func (erringBReader) MaxTime() int64 { return 0 }
func (erringBReader) Meta() BlockMeta { return BlockMeta{} }
type nopChunkWriter struct{}

200
db.go

@ -250,6 +250,178 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return m
}
// ErrClosed is returned when the db is closed.
var ErrClosed = errors.New("db already closed")
// DBReadOnly provides APIs for read only operations on a database.
// Current implementation doesn't support concurency so
// all API calls should happen in the same go routine.
type DBReadOnly struct {
logger log.Logger
dir string
closers []io.Closer
closed chan struct{}
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, errors.Wrap(err, "openning the db dir")
}
if l == nil {
l = log.NewNopLogger()
}
return &DBReadOnly{
logger: l,
dir: dir,
closed: make(chan struct{}),
}, nil
}
// Querier loads the wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}
blocksReaders, err := db.Blocks()
if err != nil {
return nil, err
}
blocks := make([]*Block, len(blocksReaders))
for i, b := range blocksReaders {
b, ok := b.(*Block)
if !ok {
return nil, errors.New("unable to convert a read only block to a normal block")
}
blocks[i] = b
}
head, err := NewHead(nil, db.logger, nil, 1)
if err != nil {
return nil, err
}
maxBlockTime := int64(math.MinInt64)
if len(blocks) > 0 {
maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime
}
// Also add the WAL if the current blocks don't cover the requestes time range.
if maxBlockTime <= maxt {
w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal"))
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, 1)
if err != nil {
return nil, err
}
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head.Init(maxBlockTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
// Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head.
head.wal = nil
db.closers = append(db.closers, head)
}
// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
// Option 2: refactor Querier to use another independent func which
// can than be used by a read only and writable db instances without any code duplication.
dbWritable := &DB{
dir: db.dir,
logger: db.logger,
blocks: blocks,
head: head,
}
return dbWritable.Querier(mint, maxt)
}
// Blocks returns a slice of block readers for persisted blocks.
func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil)
if err != nil {
return nil, err
}
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
}
}
if len(corrupted) > 0 {
for _, b := range loadable {
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing a block", err)
}
}
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted)
}
if len(loadable) == 0 {
return nil, errors.New("no blocks found")
}
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
})
blockMetas := make([]BlockMeta, 0, len(loadable))
for _, b := range loadable {
blockMetas = append(blockMetas, b.Meta())
}
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String())
}
// Close all previously open readers and add the new ones to the cache.
for _, closer := range db.closers {
closer.Close()
}
blockClosers := make([]io.Closer, len(loadable))
blockReaders := make([]BlockReader, len(loadable))
for i, b := range loadable {
blockClosers[i] = b
blockReaders[i] = b
}
db.closers = blockClosers
return blockReaders, nil
}
// Close all block readers.
func (db *DBReadOnly) Close() error {
select {
case <-db.closed:
return ErrClosed
default:
}
close(db.closed)
var merr tsdb_errors.MultiError
for _, b := range db.closers {
merr.Add(b.Close())
}
return merr.Err()
}
// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
@ -514,8 +686,10 @@ func (db *DB) compact() (err error) {
return nil
}
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks {
// getBlock iterates a given block range to find a block by a given id.
// If found it returns the block itself and a boolean to indicate that it was found.
func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
for _, b := range allBlocks {
if b.Meta().ULID == id {
return b, true
}
@ -533,14 +707,14 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
loadable, corrupted, err := db.openBlocks()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
if err != nil {
return err
}
deletable := db.deletableBlocks(loadable)
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
@ -553,7 +727,7 @@ func (db *DB) reload() (err error) {
if len(corrupted) > 0 {
// Close all new blocks to release the lock for windows.
for _, block := range loadable {
if _, loaded := db.getBlock(block.Meta().ULID); !loaded {
if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
block.Close()
}
}
@ -621,24 +795,24 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
dirs, err := blockDirs(db.dir)
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
}
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, _, err := readMetaFile(dir)
for _, bDir := range bDirs {
meta, _, err := readMetaFile(bDir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
level.Error(l).Log("msg", "not a block dir", "dir", bDir)
continue
}
// See if we already have the block in memory or open it otherwise.
block, ok := db.getBlock(meta.ULID)
if !ok {
block, err = OpenBlock(db.logger, dir, db.chunkPool)
block, open := getBlock(loaded, meta.ULID)
if !open {
block, err = OpenBlock(l, bDir, chunkPool)
if err != nil {
corrupted[meta.ULID] = err
continue

@ -1113,8 +1113,7 @@ func TestSizeRetention(t *testing.T) {
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
actSize, err := testutil.DirSize(db.Dir())
testutil.Ok(t, err)
actSize := testutil.DirSize(t, db.Dir())
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
// Decrease the max bytes limit so that a delete is triggered.
@ -1128,8 +1127,7 @@ func TestSizeRetention(t *testing.T) {
actBlocks := db.Blocks()
expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
actSize, err = testutil.DirSize(db.Dir())
testutil.Ok(t, err)
actSize = testutil.DirSize(t, db.Dir())
testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
@ -2232,3 +2230,108 @@ func TestBlockRanges(t *testing.T) {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
}
}
// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk.
// It also checks that the API calls return equivalent results as a normal db.Open() mode.
func TestDBReadOnly(t *testing.T) {
var (
dbDir string
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
expBlocks []*Block
expSeries map[string][]tsdbutil.Sample
expSeriesCount int
expDBHash []byte
matchAll = labels.NewEqualMatcher("", "")
err error
)
// Boostrap the db.
{
dbDir, err = ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dbDir))
}()
dbBlocks := []*BlockMeta{
{MinTime: 10, MaxTime: 11},
{MinTime: 11, MaxTime: 12},
{MinTime: 12, MaxTime: 13},
}
for _, m := range dbBlocks {
createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime))
}
expSeriesCount++
}
// Open a normal db to use for a comparison.
{
dbWritable, err := Open(dbDir, logger, nil, nil)
testutil.Ok(t, err)
dbWritable.DisableCompactions()
dbSizeBeforeAppend := testutil.DirSize(t, dbWritable.Dir())
app := dbWritable.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
expSeriesCount++
expBlocks = dbWritable.Blocks()
expDbSize := testutil.DirSize(t, dbWritable.Dir())
testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append")
q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
expSeries = query(t, q, matchAll)
testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows.
expDBHash = testutil.DirHash(t, dbWritable.Dir())
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
{
dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, dbReadOnly.Close())
}()
blocks, err := dbReadOnly.Blocks()
testutil.Ok(t, err)
testutil.Equals(t, len(expBlocks), len(blocks))
for i, expBlock := range expBlocks {
testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
}
q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
readOnlySeries := query(t, q, matchAll)
readOnlyDBHash := testutil.DirHash(t, dbDir)
testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch")
testutil.Equals(t, expSeries, readOnlySeries, "series mismatch")
testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
}
}
// TestDBReadOnlyClosing ensures that after closing the db
// all api methods return an ErrClosed.
func TestDBReadOnlyClosing(t *testing.T) {
dbDir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dbDir))
}()
db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
testutil.Ok(t, err)
testutil.Ok(t, db.Close())
testutil.Equals(t, db.Close(), ErrClosed)
_, err = db.Blocks()
testutil.Equals(t, err, ErrClosed)
_, err = db.Querier(0, 1)
testutil.Equals(t, err, ErrClosed)
}

@ -25,6 +25,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
@ -64,6 +65,7 @@ type Head struct {
logger log.Logger
appendPool sync.Pool
bytesPool sync.Pool
numSeries uint64
minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
@ -84,7 +86,7 @@ type Head struct {
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
@ -112,9 +114,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGauge(prometheus.GaugeOpts{
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
})
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
@ -701,6 +705,21 @@ func (h *rangeHead) MaxTime() int64 {
return h.maxt
}
func (h *rangeHead) NumSeries() uint64 {
return h.head.NumSeries()
}
func (h *rangeHead) Meta() BlockMeta {
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: h.head.Meta().ULID,
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
@ -1025,9 +1044,11 @@ func (h *Head) gc() {
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.series.Sub(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
// Using AddUint64 to substract series removed.
// See: https://golang.org/pkg/sync/atomic/#AddUint64.
atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
@ -1104,6 +1125,26 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
return &headChunkReader{head: h, mint: mint, maxt: maxt}
}
// NumSeries returns the number of active series in the head.
func (h *Head) NumSeries() uint64 {
return atomic.LoadUint64(&h.numSeries)
}
// Meta returns meta information about the head.
// The head is dynamic so will return dynamic results.
func (h *Head) Meta() BlockMeta {
var id [16]byte
copy(id[:], "______head______")
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: ulid.ULID(id),
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// MinTime returns the lowest time bound on visible data in the head.
func (h *Head) MinTime() int64 {
return atomic.LoadInt64(&h.minTime)
@ -1350,8 +1391,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
return s, false
}
h.metrics.series.Inc()
h.metrics.seriesCreated.Inc()
atomic.AddUint64(&h.numSeries, 1)
h.postings.Add(id, lset)

@ -75,5 +75,4 @@ type mockBReader struct {
func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil }
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil }
func (r *mockBReader) MinTime() int64 { return r.mint }
func (r *mockBReader) MaxTime() int64 { return r.maxt }
func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} }

@ -14,9 +14,13 @@
package testutil
import (
"crypto/sha256"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
)
const (
@ -130,16 +134,49 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) {
}
// DirSize returns the size in bytes of all files in a directory.
func DirSize(path string) (int64, error) {
func DirSize(t *testing.T, path string) int64 {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Ok(t, err)
if !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
Ok(t, err)
return size
}
// DirHash returns a hash of all files attribites and their content within a directory.
func DirHash(t *testing.T, path string) []byte {
hash := sha256.New()
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
Ok(t, err)
if info.IsDir() {
return nil
}
f, err := os.Open(path)
Ok(t, err)
defer f.Close()
_, err = io.Copy(hash, f)
Ok(t, err)
_, err = io.WriteString(hash, strconv.Itoa(int(info.Size())))
Ok(t, err)
_, err = io.WriteString(hash, info.Name())
Ok(t, err)
modTime, err := info.ModTime().GobEncode()
Ok(t, err)
_, err = io.WriteString(hash, string(modTime))
Ok(t, err)
return nil
})
Ok(t, err)
return hash.Sum(nil)
}

@ -203,6 +203,48 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
stopc: make(chan chan struct{}),
compress: compress,
}
registerMetrics(reg, w)
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
go w.run()
return w, nil
}
// Open an existing WAL.
func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
if logger == nil {
logger = log.NewNopLogger()
}
w := &WAL{
dir: dir,
logger: logger,
}
registerMetrics(reg, w)
return w, nil
}
func registerMetrics(reg prometheus.Registerer, w *WAL) {
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
@ -231,30 +273,6 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment)
}
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
go w.run()
return w, nil
}
// CompressionEnabled returns if compression is enabled on this WAL.
@ -302,7 +320,6 @@ func (w *WAL) Repair(origErr error) error {
if cerr.Segment < 0 {
return errors.New("corruption error does not specify position")
}
level.Warn(w.logger).Log("msg", "starting corruption repair",
"segment", cerr.Segment, "offset", cerr.Offset)
@ -487,7 +504,6 @@ func (w *WAL) flushPage(clear bool) error {
// First Byte of header format:
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
const (
snappyMask = 1 << 3
recTypeMask = snappyMask - 1

@ -408,10 +408,8 @@ func TestCompression(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dirUnCompressed))
}()
uncompressedSize, err := testutil.DirSize(dirUnCompressed)
testutil.Ok(t, err)
compressedSize, err := testutil.DirSize(dirCompressed)
testutil.Ok(t, err)
uncompressedSize := testutil.DirSize(t, dirUnCompressed)
compressedSize := testutil.DirSize(t, dirCompressed)
testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
}

Loading…
Cancel
Save