|
|
|
@ -378,36 +378,9 @@ func (db *DB) compact() (changes bool, err error) {
|
|
|
|
|
return changes, errors.Wrapf(err, "compact %s", plan)
|
|
|
|
|
}
|
|
|
|
|
changes = true
|
|
|
|
|
|
|
|
|
|
// close blocks in plan so that we can remove files.
|
|
|
|
|
var blocks []*Block
|
|
|
|
|
db.mtx.Lock()
|
|
|
|
|
oldBlocks := db.blocks
|
|
|
|
|
for _, b := range oldBlocks {
|
|
|
|
|
keep := true
|
|
|
|
|
for _, pd := range plan {
|
|
|
|
|
if pd == b.Dir() {
|
|
|
|
|
keep = false
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if keep {
|
|
|
|
|
blocks = append(blocks, b)
|
|
|
|
|
} else {
|
|
|
|
|
b.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
db.blocks = blocks
|
|
|
|
|
db.mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
for _, pd := range plan {
|
|
|
|
|
if err := os.RemoveAll(pd); err != nil {
|
|
|
|
|
return changes, errors.Wrap(err, "delete compacted block")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
runtime.GC()
|
|
|
|
|
|
|
|
|
|
if err := db.reload(); err != nil {
|
|
|
|
|
if err := db.reload(plan...); err != nil {
|
|
|
|
|
return changes, errors.Wrap(err, "reload blocks")
|
|
|
|
|
}
|
|
|
|
|
runtime.GC()
|
|
|
|
@ -461,7 +434,18 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) reload() (err error) {
|
|
|
|
|
func stringsContain(set []string, elem string) bool {
|
|
|
|
|
for _, e := range set {
|
|
|
|
|
if elem == e {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
|
|
|
|
// a list of block directories which should be deleted during reload.
|
|
|
|
|
func (db *DB) reload(deleteable ...string) (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err != nil {
|
|
|
|
|
db.metrics.reloadsFailed.Inc()
|
|
|
|
@ -483,6 +467,10 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrapf(err, "read meta information %s", dir)
|
|
|
|
|
}
|
|
|
|
|
// If the block is pending for deletion, don't add it to the new block set.
|
|
|
|
|
if stringsContain(deleteable, dir) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b, ok := db.getBlock(meta.ULID)
|
|
|
|
|
if !ok {
|
|
|
|
@ -508,8 +496,14 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
db.mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
for _, b := range oldBlocks {
|
|
|
|
|
if _, ok := exist[b.Meta().ULID]; !ok {
|
|
|
|
|
b.Close()
|
|
|
|
|
if _, ok := exist[b.Meta().ULID]; ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if err := b.Close(); err != nil {
|
|
|
|
|
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
|
|
|
|
}
|
|
|
|
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
|
|
|
|
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|