Merge pull request #186 from prometheus/closeallblocks

Ensure readers are closed on setup failure.
pull/5805/head
Fabian Reinartz 7 years ago committed by GitHub
commit d109149d17

49
db.go

@ -448,9 +448,6 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
var cs []io.Closer
defer func() { closeAll(cs...) }()
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
@ -482,25 +479,25 @@ func (db *DB) reload() (err error) {
return errors.Wrap(err, "invalid block sequence")
}
// Close all opened blocks that no longer exist after we returned all locks.
// TODO(fabxc: probably races with querier still reading from them. Can
// we just abandon them and have the open FDs be GC'd automatically eventually?
for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks
db.mtx.Unlock()
for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; !ok {
b.Close()
}
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return nil
}
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
maxt := blocks[len(blocks)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error {
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
if err == nil {
sq.blocks = append(sq.blocks, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
q.Close()
}
sq.blocks = append(sq.blocks, q)
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return sq, nil
}
@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
var g errgroup.Group
for _, b := range db.Blocks() {
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error {

@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, errors.Wrapf(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrapf(err, "open tombstone reader")
}
return &blockQuerier{

Loading…
Cancel
Save