review changes

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 2019-02-06 14:07:35 +02:00
parent ce4a2083fb
commit 45acaadd81
2 changed files with 18 additions and 24 deletions

View File

@ -412,7 +412,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
var merr MultiError
merr.Add(err)
if err != ErrCompactionCanceled {
if err != context.Canceled {
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
@ -481,20 +481,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
return w.ChunkWriter.WriteChunks(chunks...)
}
// ErrCompactionCanceled is returned when the compaction was canceled during shutdown.
var ErrCompactionCanceled = errors.New("compaction cancelled")
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
var writers []io.Closer
var merr MultiError
defer func(t time.Time) {
merr.Add(err)
for _, w := range writers {
w.Close()
merr.Add(w.Close())
}
if err != nil {
if merr.Err() != nil {
c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how?
if err := os.RemoveAll(tmp); err != nil {
@ -542,16 +541,9 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "write compaction")
}
// Compaction was canceled so remove tmp folders and return early.
select {
case <-c.ctx.Done():
for _, w := range writers {
w.Close()
}
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error())
}
return ErrCompactionCanceled
return c.ctx.Err()
default:
}
@ -560,9 +552,11 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
for _, w := range writers {
if err := w.Close(); err != nil {
return err
}
merr.Add(w.Close())
}
writers = writers[:0] // Avoid closing the writers twice in the defer.
if merr.Err() != nil {
return merr.Err()
}
// Populated block is empty, so cleanup and exit.
@ -632,7 +626,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
for i, b := range blocks {
select {
case <-c.ctx.Done():
return nil
return c.ctx.Err()
default:
}
@ -694,7 +688,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
for set.Next() {
select {
case <-c.ctx.Done():
return nil
return c.ctx.Err()
default:
}

10
db.go
View File

@ -129,7 +129,7 @@ type DB struct {
autoCompact bool
// Cancel a running compaction when a shutdown is initiated.
compactCnl func()
compactCancel context.CancelFunc
}
type dbMetrics struct {
@ -275,13 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = lockf
}
ctx, cnl := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool)
if err != nil {
cnl()
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
}
db.compactCnl = cnl
db.compactCancel = cancel
segmentSize := wal.DefaultSegmentSize
if opts.WALSegmentSize > 0 {
@ -819,7 +819,7 @@ func (db *DB) Head() *Head {
// Close the partition.
func (db *DB) Close() error {
close(db.stopc)
db.compactCnl()
db.compactCancel()
<-db.donec
db.mtx.Lock()