|
|
@ -82,7 +82,7 @@ type LeveledCompactor struct { |
|
|
|
|
|
|
|
|
|
|
|
type compactorMetrics struct { |
|
|
|
type compactorMetrics struct { |
|
|
|
ran prometheus.Counter |
|
|
|
ran prometheus.Counter |
|
|
|
populatingBlocks prometheus.Counter |
|
|
|
populatingBlocks prometheus.Gauge |
|
|
|
failed prometheus.Counter |
|
|
|
failed prometheus.Counter |
|
|
|
duration prometheus.Histogram |
|
|
|
duration prometheus.Histogram |
|
|
|
chunkSize prometheus.Histogram |
|
|
|
chunkSize prometheus.Histogram |
|
|
@ -486,12 +486,12 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { |
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { |
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { |
|
|
|
dir := filepath.Join(dest, meta.ULID.String()) |
|
|
|
dir := filepath.Join(dest, meta.ULID.String()) |
|
|
|
tmp := dir + ".tmp" |
|
|
|
tmp := dir + ".tmp" |
|
|
|
var writers []io.Closer |
|
|
|
var closers []io.Closer |
|
|
|
defer func(t time.Time) { |
|
|
|
defer func(t time.Time) { |
|
|
|
var merr MultiError |
|
|
|
var merr MultiError |
|
|
|
merr.Add(err) |
|
|
|
merr.Add(err) |
|
|
|
err = merr.Err() |
|
|
|
err = merr.Err() |
|
|
|
for _, w := range writers { |
|
|
|
for _, w := range closers { |
|
|
|
merr.Add(w.Close()) |
|
|
|
merr.Add(w.Close()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -522,7 +522,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return errors.Wrap(err, "open chunk writer") |
|
|
|
return errors.Wrap(err, "open chunk writer") |
|
|
|
} |
|
|
|
} |
|
|
|
writers = append(writers, chunkw) |
|
|
|
closers = append(closers, chunkw) |
|
|
|
// Record written chunk sizes on level 1 compactions.
|
|
|
|
// Record written chunk sizes on level 1 compactions.
|
|
|
|
if meta.Compaction.Level == 1 { |
|
|
|
if meta.Compaction.Level == 1 { |
|
|
|
chunkw = &instrumentedChunkWriter{ |
|
|
|
chunkw = &instrumentedChunkWriter{ |
|
|
@ -537,7 +537,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return errors.Wrap(err, "open index writer") |
|
|
|
return errors.Wrap(err, "open index writer") |
|
|
|
} |
|
|
|
} |
|
|
|
writers = append(writers, indexw) |
|
|
|
closers = append(closers, indexw) |
|
|
|
|
|
|
|
|
|
|
|
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { |
|
|
|
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { |
|
|
|
return errors.Wrap(err, "write compaction") |
|
|
|
return errors.Wrap(err, "write compaction") |
|
|
@ -554,10 +554,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe |
|
|
|
// you cannot delete these unless they are closed and the defer is to
|
|
|
|
// you cannot delete these unless they are closed and the defer is to
|
|
|
|
// make sure they are closed if the function exits due to an error above.
|
|
|
|
// make sure they are closed if the function exits due to an error above.
|
|
|
|
var merr MultiError |
|
|
|
var merr MultiError |
|
|
|
for _, w := range writers { |
|
|
|
for _, w := range closers { |
|
|
|
merr.Add(w.Close()) |
|
|
|
merr.Add(w.Close()) |
|
|
|
} |
|
|
|
} |
|
|
|
writers = writers[:0] // Avoid closing the writers twice in the defer.
|
|
|
|
closers = closers[:0] // Avoid closing the writers twice in the defer.
|
|
|
|
if merr.Err() != nil { |
|
|
|
if merr.Err() != nil { |
|
|
|
return merr.Err() |
|
|
|
return merr.Err() |
|
|
|
} |
|
|
|
} |
|
|
@ -618,10 +618,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, |
|
|
|
) |
|
|
|
) |
|
|
|
defer func() { |
|
|
|
defer func() { |
|
|
|
closeAll(closers...) |
|
|
|
closeAll(closers...) |
|
|
|
c.metrics.populatingBlocks.Add(-1) |
|
|
|
c.metrics.populatingBlocks.Set(0) |
|
|
|
}() |
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
c.metrics.populatingBlocks.Inc() |
|
|
|
c.metrics.populatingBlocks.Set(1) |
|
|
|
|
|
|
|
|
|
|
|
for i, b := range blocks { |
|
|
|
for i, b := range blocks { |
|
|
|
select { |
|
|
|
select { |
|
|
|