|
|
|
@ -412,10 +412,11 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
|
|
|
|
|
|
|
|
|
var merr MultiError
|
|
|
|
|
merr.Add(err)
|
|
|
|
|
|
|
|
|
|
for _, b := range bs {
|
|
|
|
|
if err := b.setCompactionFailed(); err != nil {
|
|
|
|
|
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
|
|
|
|
if err != ErrCompactionCanceled {
|
|
|
|
|
for _, b := range bs {
|
|
|
|
|
if err := b.setCompactionFailed(); err != nil {
|
|
|
|
|
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -472,6 +473,9 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
|
|
|
|
return w.ChunkWriter.WriteChunks(chunks...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ErrCompactionCanceled is returned when the compaction was canceled during shutdown.
|
|
|
|
|
var ErrCompactionCanceled = errors.New("compaction cancelled")
|
|
|
|
|
|
|
|
|
|
// write creates a new block that is the union of the provided blocks into dir.
|
|
|
|
|
// It cleans up all files of the old blocks after completing successfully.
|
|
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
|
|
|
@ -539,7 +543,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|
|
|
|
if err := os.RemoveAll(tmp); err != nil {
|
|
|
|
|
level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error())
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
return ErrCompactionCanceled
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|