|
|
|
@ -250,6 +250,178 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
|
|
|
|
return m |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ErrClosed is returned when the db is closed.
|
|
|
|
|
var ErrClosed = errors.New("db already closed") |
|
|
|
|
|
|
|
|
|
// DBReadOnly provides APIs for read only operations on a database.
|
|
|
|
|
// Current implementation doesn't support concurency so
|
|
|
|
|
// all API calls should happen in the same go routine.
|
|
|
|
|
type DBReadOnly struct { |
|
|
|
|
logger log.Logger |
|
|
|
|
dir string |
|
|
|
|
closers []io.Closer |
|
|
|
|
closed chan struct{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OpenDBReadOnly opens DB in the given directory for read only operations.
|
|
|
|
|
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { |
|
|
|
|
if _, err := os.Stat(dir); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "openning the db dir") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if l == nil { |
|
|
|
|
l = log.NewNopLogger() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &DBReadOnly{ |
|
|
|
|
logger: l, |
|
|
|
|
dir: dir, |
|
|
|
|
closed: make(chan struct{}), |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Querier loads the wal and returns a new querier over the data partition for the given time range.
|
|
|
|
|
// Current implementation doesn't support multiple Queriers.
|
|
|
|
|
func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { |
|
|
|
|
select { |
|
|
|
|
case <-db.closed: |
|
|
|
|
return nil, ErrClosed |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
blocksReaders, err := db.Blocks() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
blocks := make([]*Block, len(blocksReaders)) |
|
|
|
|
for i, b := range blocksReaders { |
|
|
|
|
b, ok := b.(*Block) |
|
|
|
|
if !ok { |
|
|
|
|
return nil, errors.New("unable to convert a read only block to a normal block") |
|
|
|
|
} |
|
|
|
|
blocks[i] = b |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
head, err := NewHead(nil, db.logger, nil, 1) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
maxBlockTime := int64(math.MinInt64) |
|
|
|
|
if len(blocks) > 0 { |
|
|
|
|
maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Also add the WAL if the current blocks don't cover the requestes time range.
|
|
|
|
|
if maxBlockTime <= maxt { |
|
|
|
|
w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
head, err = NewHead(nil, db.logger, w, 1) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// Set the min valid time for the ingested wal samples
|
|
|
|
|
// to be no lower than the maxt of the last block.
|
|
|
|
|
if err := head.Init(maxBlockTime); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "read WAL") |
|
|
|
|
} |
|
|
|
|
// Set the wal to nil to disable all wal operations.
|
|
|
|
|
// This is mainly to avoid blocking when closing the head.
|
|
|
|
|
head.wal = nil |
|
|
|
|
|
|
|
|
|
db.closers = append(db.closers, head) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
|
|
|
|
|
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
|
|
|
|
|
// Option 2: refactor Querier to use another independent func which
|
|
|
|
|
// can than be used by a read only and writable db instances without any code duplication.
|
|
|
|
|
dbWritable := &DB{ |
|
|
|
|
dir: db.dir, |
|
|
|
|
logger: db.logger, |
|
|
|
|
blocks: blocks, |
|
|
|
|
head: head, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return dbWritable.Querier(mint, maxt) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Blocks returns a slice of block readers for persisted blocks.
|
|
|
|
|
func (db *DBReadOnly) Blocks() ([]BlockReader, error) { |
|
|
|
|
select { |
|
|
|
|
case <-db.closed: |
|
|
|
|
return nil, ErrClosed |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
|
|
|
|
|
for _, block := range loadable { |
|
|
|
|
for _, b := range block.Meta().Compaction.Parents { |
|
|
|
|
delete(corrupted, b.ULID) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if len(corrupted) > 0 { |
|
|
|
|
for _, b := range loadable { |
|
|
|
|
if err := b.Close(); err != nil { |
|
|
|
|
level.Warn(db.logger).Log("msg", "closing a block", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(loadable) == 0 { |
|
|
|
|
return nil, errors.New("no blocks found") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sort.Slice(loadable, func(i, j int) bool { |
|
|
|
|
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
blockMetas := make([]BlockMeta, 0, len(loadable)) |
|
|
|
|
for _, b := range loadable { |
|
|
|
|
blockMetas = append(blockMetas, b.Meta()) |
|
|
|
|
} |
|
|
|
|
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { |
|
|
|
|
level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close all previously open readers and add the new ones to the cache.
|
|
|
|
|
for _, closer := range db.closers { |
|
|
|
|
closer.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blockClosers := make([]io.Closer, len(loadable)) |
|
|
|
|
blockReaders := make([]BlockReader, len(loadable)) |
|
|
|
|
for i, b := range loadable { |
|
|
|
|
blockClosers[i] = b |
|
|
|
|
blockReaders[i] = b |
|
|
|
|
} |
|
|
|
|
db.closers = blockClosers |
|
|
|
|
|
|
|
|
|
return blockReaders, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close all block readers.
|
|
|
|
|
func (db *DBReadOnly) Close() error { |
|
|
|
|
select { |
|
|
|
|
case <-db.closed: |
|
|
|
|
return ErrClosed |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
close(db.closed) |
|
|
|
|
|
|
|
|
|
var merr tsdb_errors.MultiError |
|
|
|
|
|
|
|
|
|
for _, b := range db.closers { |
|
|
|
|
merr.Add(b.Close()) |
|
|
|
|
} |
|
|
|
|
return merr.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Open returns a new DB in the given directory.
|
|
|
|
|
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { |
|
|
|
|
if err := os.MkdirAll(dir, 0777); err != nil { |
|
|
|
@ -514,8 +686,10 @@ func (db *DB) compact() (err error) {
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { |
|
|
|
|
for _, b := range db.blocks { |
|
|
|
|
// getBlock iterates a given block range to find a block by a given id.
|
|
|
|
|
// If found it returns the block itself and a boolean to indicate that it was found.
|
|
|
|
|
func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { |
|
|
|
|
for _, b := range allBlocks { |
|
|
|
|
if b.Meta().ULID == id { |
|
|
|
|
return b, true |
|
|
|
|
} |
|
|
|
@ -533,14 +707,14 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
db.metrics.reloads.Inc() |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
loadable, corrupted, err := db.openBlocks() |
|
|
|
|
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
deletable := db.deletableBlocks(loadable) |
|
|
|
|
|
|
|
|
|
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
|
|
|
|
|
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
|
|
|
|
|
// This makes it resilient against the process crashing towards the end of a compaction.
|
|
|
|
|
// Creation of a new block and deletion of its parents cannot happen atomically.
|
|
|
|
|
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
|
|
|
|
@ -553,7 +727,7 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
if len(corrupted) > 0 { |
|
|
|
|
// Close all new blocks to release the lock for windows.
|
|
|
|
|
for _, block := range loadable { |
|
|
|
|
if _, loaded := db.getBlock(block.Meta().ULID); !loaded { |
|
|
|
|
if _, open := getBlock(db.blocks, block.Meta().ULID); !open { |
|
|
|
|
block.Close() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -621,24 +795,24 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { |
|
|
|
|
dirs, err := blockDirs(db.dir) |
|
|
|
|
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { |
|
|
|
|
bDirs, err := blockDirs(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, nil, errors.Wrap(err, "find blocks") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
corrupted = make(map[ulid.ULID]error) |
|
|
|
|
for _, dir := range dirs { |
|
|
|
|
meta, _, err := readMetaFile(dir) |
|
|
|
|
for _, bDir := range bDirs { |
|
|
|
|
meta, _, err := readMetaFile(bDir) |
|
|
|
|
if err != nil { |
|
|
|
|
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) |
|
|
|
|
level.Error(l).Log("msg", "not a block dir", "dir", bDir) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// See if we already have the block in memory or open it otherwise.
|
|
|
|
|
block, ok := db.getBlock(meta.ULID) |
|
|
|
|
if !ok { |
|
|
|
|
block, err = OpenBlock(db.logger, dir, db.chunkPool) |
|
|
|
|
block, open := getBlock(loaded, meta.ULID) |
|
|
|
|
if !open { |
|
|
|
|
block, err = OpenBlock(l, bDir, chunkPool) |
|
|
|
|
if err != nil { |
|
|
|
|
corrupted[meta.ULID] = err |
|
|
|
|
continue |
|
|
|
|