|
|
|
// Copyright 2017 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package tsdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"crypto/rand"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/go-kit/log"
|
|
|
|
"github.com/go-kit/log/level"
|
|
|
|
"github.com/oklog/ulid"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/storage"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/index"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ExponentialBlockRanges returns the time ranges based on the stepSize.
|
|
|
|
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
|
|
|
ranges := make([]int64, 0, steps)
|
|
|
|
curRange := minSize
|
|
|
|
for i := 0; i < steps; i++ {
|
|
|
|
ranges = append(ranges, curRange)
|
|
|
|
curRange *= int64(stepSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
return ranges
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compactor provides compaction against an underlying storage
|
|
|
|
// of time series data.
|
|
|
|
type Compactor interface {
|
|
|
|
// Plan returns a set of directories that can be compacted concurrently.
|
|
|
|
// The directories can be overlapping.
|
|
|
|
// Results returned when compactions are in progress are undefined.
|
|
|
|
Plan(dir string) ([]string, error)
|
|
|
|
|
|
|
|
// Write persists a Block into a directory.
|
|
|
|
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
|
|
|
|
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
|
|
|
|
|
|
|
|
// Compact runs compaction against the provided directories. Must
|
|
|
|
// only be called concurrently with results of Plan().
|
|
|
|
// Can optionally pass a list of already open blocks,
|
|
|
|
// to avoid having to reopen them.
|
|
|
|
// When resulting Block has 0 samples
|
|
|
|
// * No block is written.
|
|
|
|
// * The source dirs are marked Deletable.
|
|
|
|
// * Returns empty ulid.ULID{}.
|
|
|
|
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// LeveledCompactor implements the Compactor interface.
|
|
|
|
type LeveledCompactor struct {
|
|
|
|
metrics *CompactorMetrics
|
|
|
|
logger log.Logger
|
|
|
|
ranges []int64
|
|
|
|
chunkPool chunkenc.Pool
|
|
|
|
ctx context.Context
|
|
|
|
maxBlockChunkSegmentSize int64
|
|
|
|
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
|
|
|
}
|
|
|
|
|
|
|
|
type CompactorMetrics struct {
|
|
|
|
Ran prometheus.Counter
|
|
|
|
PopulatingBlocks prometheus.Gauge
|
|
|
|
OverlappingBlocks prometheus.Counter
|
|
|
|
Duration prometheus.Histogram
|
|
|
|
ChunkSize prometheus.Histogram
|
|
|
|
ChunkSamples prometheus.Histogram
|
|
|
|
ChunkRange prometheus.Histogram
|
|
|
|
}
|
|
|
|
|
|
|
|
func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
|
|
|
|
m := &CompactorMetrics{}
|
|
|
|
|
|
|
|
m.Ran = prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
Name: "prometheus_tsdb_compactions_total",
|
|
|
|
Help: "Total number of compactions that were executed for the partition.",
|
|
|
|
})
|
|
|
|
m.PopulatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
|
|
Name: "prometheus_tsdb_compaction_populating_block",
|
|
|
|
Help: "Set to 1 when a block is currently being written to the disk.",
|
|
|
|
})
|
|
|
|
m.OverlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
Name: "prometheus_tsdb_vertical_compactions_total",
|
|
|
|
Help: "Total number of compactions done on overlapping blocks.",
|
|
|
|
})
|
|
|
|
m.Duration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
|
|
Name: "prometheus_tsdb_compaction_duration_seconds",
|
|
|
|
Help: "Duration of compaction runs",
|
|
|
|
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
|
|
|
|
})
|
|
|
|
m.ChunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
|
|
Name: "prometheus_tsdb_compaction_chunk_size_bytes",
|
|
|
|
Help: "Final size of chunks on their first compaction",
|
|
|
|
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
|
|
|
|
})
|
|
|
|
m.ChunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
|
|
Name: "prometheus_tsdb_compaction_chunk_samples",
|
|
|
|
Help: "Final number of samples on their first compaction",
|
|
|
|
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
|
|
|
|
})
|
|
|
|
m.ChunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
|
|
Name: "prometheus_tsdb_compaction_chunk_range_seconds",
|
|
|
|
Help: "Final time range of chunks on their first compaction",
|
|
|
|
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
|
|
|
|
})
|
|
|
|
|
|
|
|
if r != nil {
|
|
|
|
r.MustRegister(
|
|
|
|
m.Ran,
|
|
|
|
m.PopulatingBlocks,
|
|
|
|
m.OverlappingBlocks,
|
|
|
|
m.Duration,
|
|
|
|
m.ChunkRange,
|
|
|
|
m.ChunkSamples,
|
|
|
|
m.ChunkSize,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return m
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewLeveledCompactor returns a LeveledCompactor.
|
|
|
|
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
|
|
|
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
|
|
|
if len(ranges) == 0 {
|
|
|
|
return nil, errors.Errorf("at least one range must be provided")
|
|
|
|
}
|
|
|
|
if pool == nil {
|
|
|
|
pool = chunkenc.NewPool()
|
|
|
|
}
|
|
|
|
if l == nil {
|
|
|
|
l = log.NewNopLogger()
|
|
|
|
}
|
|
|
|
if mergeFunc == nil {
|
|
|
|
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
|
|
|
|
}
|
|
|
|
return &LeveledCompactor{
|
|
|
|
ranges: ranges,
|
|
|
|
chunkPool: pool,
|
|
|
|
logger: l,
|
|
|
|
metrics: newCompactorMetrics(r),
|
|
|
|
ctx: ctx,
|
|
|
|
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
|
|
|
mergeFunc: mergeFunc,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type dirMeta struct {
|
|
|
|
dir string
|
|
|
|
meta *BlockMeta
|
|
|
|
}
|
|
|
|
|
|
|
|
// Plan returns a list of compactable blocks in the provided directory.
|
|
|
|
func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
|
|
|
|
dirs, err := blockDirs(dir)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if len(dirs) < 1 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var dms []dirMeta
|
|
|
|
for _, dir := range dirs {
|
|
|
|
meta, _, err := readMetaFile(dir)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
dms = append(dms, dirMeta{dir, meta})
|
|
|
|
}
|
|
|
|
return c.plan(dms)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
|
|
|
|
slices.SortFunc(dms, func(a, b dirMeta) int {
|
|
|
|
switch {
|
|
|
|
case a.meta.MinTime < b.meta.MinTime:
|
|
|
|
return -1
|
|
|
|
case a.meta.MinTime > b.meta.MinTime:
|
|
|
|
return 1
|
|
|
|
default:
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
res := c.selectOverlappingDirs(dms)
|
|
|
|
if len(res) > 0 {
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
// No overlapping blocks, do compaction the usual way.
|
|
|
|
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
|
|
|
|
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
|
|
|
|
dms = dms[:len(dms)-1]
|
|
|
|
|
|
|
|
for _, dm := range c.selectDirs(dms) {
|
|
|
|
res = append(res, dm.dir)
|
|
|
|
}
|
|
|
|
if len(res) > 0 {
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compact any blocks with big enough time range that have >5% tombstones.
|
|
|
|
for i := len(dms) - 1; i >= 0; i-- {
|
|
|
|
meta := dms[i].meta
|
|
|
|
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
|
|
|
|
// If the block is entirely deleted, then we don't care about the block being big enough.
|
|
|
|
// TODO: This is assuming single tombstone is for distinct series, which might be no true.
|
|
|
|
if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries {
|
|
|
|
return []string{dms[i].dir}, nil
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
|
|
|
|
return []string{dms[i].dir}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// selectDirs returns the dir metas that should be compacted into a single new block.
|
|
|
|
// If only a single block range is configured, the result is always nil.
|
|
|
|
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
|
|
|
|
if len(c.ranges) < 2 || len(ds) < 1 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
highTime := ds[len(ds)-1].meta.MinTime
|
|
|
|
|
|
|
|
for _, iv := range c.ranges[1:] {
|
|
|
|
parts := splitByRange(ds, iv)
|
|
|
|
if len(parts) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
Outer:
|
|
|
|
for _, p := range parts {
|
|
|
|
// Do not select the range if it has a block whose compaction failed.
|
|
|
|
for _, dm := range p {
|
|
|
|
if dm.meta.Compaction.Failed {
|
|
|
|
continue Outer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
mint := p[0].meta.MinTime
|
|
|
|
maxt := p[len(p)-1].meta.MaxTime
|
|
|
|
// Pick the range of blocks if it spans the full range (potentially with gaps)
|
|
|
|
// or is before the most recent block.
|
|
|
|
// This ensures we don't compact blocks prematurely when another one of the same
|
|
|
|
// size still fits in the range.
|
|
|
|
if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// selectOverlappingDirs returns all dirs with overlapping time ranges.
|
|
|
|
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
|
|
|
|
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
|
|
|
|
if len(ds) < 2 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var overlappingDirs []string
|
|
|
|
globalMaxt := ds[0].meta.MaxTime
|
|
|
|
for i, d := range ds[1:] {
|
|
|
|
if d.meta.MinTime < globalMaxt {
|
|
|
|
if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
|
|
|
|
overlappingDirs = append(overlappingDirs, ds[i].dir)
|
|
|
|
}
|
|
|
|
overlappingDirs = append(overlappingDirs, d.dir)
|
|
|
|
} else if len(overlappingDirs) > 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if d.meta.MaxTime > globalMaxt {
|
|
|
|
globalMaxt = d.meta.MaxTime
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return overlappingDirs
|
|
|
|
}
|
|
|
|
|
|
|
|
// splitByRange splits the directories by the time range. The range sequence starts at 0.
|
|
|
|
//
|
|
|
|
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
|
|
|
|
// it returns [0-10, 10-20], [50-60], [90-100].
|
|
|
|
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
|
|
|
var splitDirs [][]dirMeta
|
|
|
|
|
|
|
|
for i := 0; i < len(ds); {
|
|
|
|
var (
|
|
|
|
group []dirMeta
|
|
|
|
t0 int64
|
|
|
|
m = ds[i].meta
|
|
|
|
)
|
|
|
|
// Compute start of aligned time range of size tr closest to the current block's start.
|
|
|
|
if m.MinTime >= 0 {
|
|
|
|
t0 = tr * (m.MinTime / tr)
|
|
|
|
} else {
|
|
|
|
t0 = tr * ((m.MinTime - tr + 1) / tr)
|
|
|
|
}
|
|
|
|
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
|
|
|
|
// by being the multiple of the intended range.
|
|
|
|
if m.MaxTime > t0+tr {
|
|
|
|
i++
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add all dirs to the current group that are within [t0, t0+tr].
|
|
|
|
for ; i < len(ds); i++ {
|
|
|
|
// Either the block falls into the next range or doesn't fit at all (checked above).
|
|
|
|
if ds[i].meta.MaxTime > t0+tr {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
group = append(group, ds[i])
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(group) > 0 {
|
|
|
|
splitDirs = append(splitDirs, group)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return splitDirs
|
|
|
|
}
|
|
|
|
|
|
|
|
// CompactBlockMetas merges many block metas into one, combining it's source blocks together
|
|
|
|
// and adjusting compaction level. Min/Max time of result block meta covers all input blocks.
|
|
|
|
func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
|
|
|
res := &BlockMeta{
|
|
|
|
ULID: uid,
|
|
|
|
}
|
|
|
|
|
|
|
|
sources := map[ulid.ULID]struct{}{}
|
|
|
|
mint := blocks[0].MinTime
|
|
|
|
maxt := blocks[0].MaxTime
|
|
|
|
|
|
|
|
for _, b := range blocks {
|
|
|
|
if b.MinTime < mint {
|
|
|
|
mint = b.MinTime
|
|
|
|
}
|
|
|
|
if b.MaxTime > maxt {
|
|
|
|
maxt = b.MaxTime
|
|
|
|
}
|
|
|
|
if b.Compaction.Level > res.Compaction.Level {
|
|
|
|
res.Compaction.Level = b.Compaction.Level
|
|
|
|
}
|
|
|
|
for _, s := range b.Compaction.Sources {
|
|
|
|
sources[s] = struct{}{}
|
|
|
|
}
|
|
|
|
res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
|
|
|
|
ULID: b.ULID,
|
|
|
|
MinTime: b.MinTime,
|
|
|
|
MaxTime: b.MaxTime,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
res.Compaction.Level++
|
|
|
|
|
|
|
|
for s := range sources {
|
|
|
|
res.Compaction.Sources = append(res.Compaction.Sources, s)
|
|
|
|
}
|
|
|
|
slices.SortFunc(res.Compaction.Sources, func(a, b ulid.ULID) int {
|
|
|
|
return a.Compare(b)
|
|
|
|
})
|
|
|
|
|
|
|
|
res.MinTime = mint
|
|
|
|
res.MaxTime = maxt
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
|
|
|
// provided directories.
|
|
|
|
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
|
|
|
|
return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) {
|
|
|
|
var (
|
|
|
|
blocks []BlockReader
|
|
|
|
bs []*Block
|
|
|
|
metas []*BlockMeta
|
|
|
|
uids []string
|
|
|
|
)
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
for _, d := range dirs {
|
|
|
|
meta, _, err := readMetaFile(d)
|
|
|
|
if err != nil {
|
|
|
|
return uid, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var b *Block
|
|
|
|
|
|
|
|
// Use already open blocks if we can, to avoid
|
|
|
|
// having the index data in memory twice.
|
|
|
|
for _, o := range open {
|
|
|
|
if meta.ULID == o.Meta().ULID {
|
|
|
|
b = o
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if b == nil {
|
|
|
|
var err error
|
|
|
|
b, err = OpenBlock(c.logger, d, c.chunkPool)
|
|
|
|
if err != nil {
|
|
|
|
return uid, err
|
|
|
|
}
|
|
|
|
defer b.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
metas = append(metas, meta)
|
|
|
|
blocks = append(blocks, b)
|
|
|
|
bs = append(bs, b)
|
|
|
|
uids = append(uids, meta.ULID.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
uid = ulid.MustNew(ulid.Now(), rand.Reader)
|
|
|
|
|
|
|
|
meta := CompactBlockMetas(uid, metas...)
|
|
|
|
err = c.write(dest, meta, blockPopulator, blocks...)
|
|
|
|
if err == nil {
|
|
|
|
if meta.Stats.NumSamples == 0 {
|
|
|
|
for _, b := range bs {
|
|
|
|
b.meta.Compaction.Deletable = true
|
|
|
|
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
|
|
|
|
if err != nil {
|
|
|
|
level.Error(c.logger).Log(
|
|
|
|
"msg", "Failed to write 'Deletable' to meta file after compaction",
|
|
|
|
"ulid", b.meta.ULID,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
b.numBytesMeta = n
|
|
|
|
}
|
|
|
|
uid = ulid.ULID{}
|
|
|
|
level.Info(c.logger).Log(
|
|
|
|
"msg", "compact blocks resulted in empty block",
|
|
|
|
"count", len(blocks),
|
|
|
|
"sources", fmt.Sprintf("%v", uids),
|
|
|
|
"duration", time.Since(start),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
level.Info(c.logger).Log(
|
|
|
|
"msg", "compact blocks",
|
|
|
|
"count", len(blocks),
|
|
|
|
"mint", meta.MinTime,
|
|
|
|
"maxt", meta.MaxTime,
|
|
|
|
"ulid", meta.ULID,
|
|
|
|
"sources", fmt.Sprintf("%v", uids),
|
|
|
|
"duration", time.Since(start),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return uid, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
errs := tsdb_errors.NewMulti(err)
|
|
|
|
if !errors.Is(err, context.Canceled) {
|
|
|
|
for _, b := range bs {
|
|
|
|
if err := b.setCompactionFailed(); err != nil {
|
|
|
|
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return uid, errs.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
uid := ulid.MustNew(ulid.Now(), rand.Reader)
|
|
|
|
|
|
|
|
meta := &BlockMeta{
|
|
|
|
ULID: uid,
|
|
|
|
MinTime: mint,
|
|
|
|
MaxTime: maxt,
|
|
|
|
}
|
|
|
|
meta.Compaction.Level = 1
|
|
|
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
|
|
|
|
|
|
|
if parent != nil {
|
|
|
|
meta.Compaction.Parents = []BlockDesc{
|
|
|
|
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
|
|
|
if err != nil {
|
|
|
|
return uid, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if meta.Stats.NumSamples == 0 {
|
|
|
|
level.Info(c.logger).Log(
|
|
|
|
"msg", "write block resulted in empty block",
|
|
|
|
"mint", meta.MinTime,
|
|
|
|
"maxt", meta.MaxTime,
|
|
|
|
"duration", time.Since(start),
|
|
|
|
)
|
|
|
|
return ulid.ULID{}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
level.Info(c.logger).Log(
|
|
|
|
"msg", "write block",
|
|
|
|
"mint", meta.MinTime,
|
|
|
|
"maxt", meta.MaxTime,
|
|
|
|
"ulid", meta.ULID,
|
|
|
|
"duration", time.Since(start),
|
|
|
|
)
|
|
|
|
return uid, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// instrumentedChunkWriter is used for level 1 compactions to record statistics
|
|
|
|
// about compacted chunks.
|
|
|
|
type instrumentedChunkWriter struct {
|
|
|
|
ChunkWriter
|
|
|
|
|
|
|
|
size prometheus.Histogram
|
|
|
|
samples prometheus.Histogram
|
|
|
|
trange prometheus.Histogram
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
|
|
|
for _, c := range chunks {
|
|
|
|
w.size.Observe(float64(len(c.Chunk.Bytes())))
|
|
|
|
w.samples.Observe(float64(c.Chunk.NumSamples()))
|
|
|
|
w.trange.Observe(float64(c.MaxTime - c.MinTime))
|
|
|
|
}
|
|
|
|
return w.ChunkWriter.WriteChunks(chunks...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// write creates a new block that is the union of the provided blocks into dir.
|
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) {
|
|
|
|
dir := filepath.Join(dest, meta.ULID.String())
|
|
|
|
tmp := dir + tmpForCreationBlockDirSuffix
|
|
|
|
var closers []io.Closer
|
|
|
|
defer func(t time.Time) {
|
|
|
|
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
|
|
|
|
|
|
|
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
|
|
|
if err := os.RemoveAll(tmp); err != nil {
|
|
|
|
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
|
|
|
|
}
|
|
|
|
c.metrics.Ran.Inc()
|
|
|
|
c.metrics.Duration.Observe(time.Since(t).Seconds())
|
|
|
|
}(time.Now())
|
|
|
|
|
|
|
|
if err = os.RemoveAll(tmp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = os.MkdirAll(tmp, 0o777); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Populate chunk and index files into temporary directory with
|
|
|
|
// data of all blocks.
|
|
|
|
var chunkw ChunkWriter
|
|
|
|
|
|
|
|
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "open chunk writer")
|
|
|
|
}
|
|
|
|
closers = append(closers, chunkw)
|
|
|
|
// Record written chunk sizes on level 1 compactions.
|
|
|
|
if meta.Compaction.Level == 1 {
|
|
|
|
chunkw = &instrumentedChunkWriter{
|
|
|
|
ChunkWriter: chunkw,
|
|
|
|
size: c.metrics.ChunkSize,
|
|
|
|
samples: c.metrics.ChunkSamples,
|
|
|
|
trange: c.metrics.ChunkRange,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "open index writer")
|
|
|
|
}
|
|
|
|
closers = append(closers, indexw)
|
|
|
|
|
|
|
|
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
|
|
|
|
return errors.Wrap(err, "populate block")
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-c.ctx.Done():
|
|
|
|
return c.ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// We are explicitly closing them here to check for error even
|
|
|
|
// though these are covered under defer. This is because in Windows,
|
|
|
|
// you cannot delete these unless they are closed and the defer is to
|
|
|
|
// make sure they are closed if the function exits due to an error above.
|
|
|
|
errs := tsdb_errors.NewMulti()
|
|
|
|
for _, w := range closers {
|
|
|
|
errs.Add(w.Close())
|
|
|
|
}
|
|
|
|
closers = closers[:0] // Avoid closing the writers twice in the defer.
|
|
|
|
if errs.Err() != nil {
|
|
|
|
return errs.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Populated block is empty, so exit early.
|
|
|
|
if meta.Stats.NumSamples == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
|
|
|
|
return errors.Wrap(err, "write merged meta")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create an empty tombstones file.
|
|
|
|
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
|
|
|
|
return errors.Wrap(err, "write new tombstones file")
|
|
|
|
}
|
|
|
|
|
|
|
|
df, err := fileutil.OpenDir(tmp)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "open temporary block dir")
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if df != nil {
|
|
|
|
df.Close()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if err := df.Sync(); err != nil {
|
|
|
|
return errors.Wrap(err, "sync temporary dir file")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close temp dir before rename block dir (for windows platform).
|
|
|
|
if err = df.Close(); err != nil {
|
|
|
|
return errors.Wrap(err, "close temporary dir")
|
|
|
|
}
|
|
|
|
df = nil
|
|
|
|
|
|
|
|
// Block successfully written, make it visible in destination dir by moving it from tmp one.
|
|
|
|
if err := fileutil.Replace(tmp, dir); err != nil {
|
|
|
|
return errors.Wrap(err, "rename block dir")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type BlockPopulator interface {
|
|
|
|
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type DefaultBlockPopulator struct{}
|
|
|
|
|
|
|
|
// PopulateBlock fills the index and chunk writers with new data gathered as the union
|
|
|
|
// of the provided blocks. It returns meta information for the new block.
|
|
|
|
// It expects sorted blocks input by mint.
|
|
|
|
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
|
|
|
|
if len(blocks) == 0 {
|
|
|
|
return errors.New("cannot populate block from no readers")
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
sets []storage.ChunkSeriesSet
|
Stream symbols during compaction. (#6468)
Rather than buffer up symbols in RAM, do it one by one
during compaction. Then use the reader's symbol handling
for symbol lookups during the rest of the index write.
There is some slowdown in compaction, due to having to look through a file
rather than a hash lookup. This is noise to the overall cost of compacting
series with thousands of samples though.
benchmark old ns/op new ns/op delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35%
benchmark old allocs new allocs delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59%
benchmark old bytes new bytes delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60%
Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
5 years ago
|
|
|
symbols index.StringIter
|
|
|
|
closers []io.Closer
|
|
|
|
overlapping bool
|
|
|
|
)
|
|
|
|
defer func() {
|
|
|
|
errs := tsdb_errors.NewMulti(err)
|
|
|
|
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
|
|
|
|
errs.Add(errors.Wrap(cerr, "close"))
|
|
|
|
}
|
|
|
|
err = errs.Err()
|
|
|
|
metrics.PopulatingBlocks.Set(0)
|
|
|
|
}()
|
|
|
|
metrics.PopulatingBlocks.Set(1)
|
|
|
|
|
|
|
|
globalMaxt := blocks[0].Meta().MaxTime
|
|
|
|
for i, b := range blocks {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
if !overlapping {
|
|
|
|
if i > 0 && b.Meta().MinTime < globalMaxt {
|
|
|
|
metrics.OverlappingBlocks.Inc()
|
|
|
|
overlapping = true
|
|
|
|
level.Info(logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
|
|
|
|
}
|
|
|
|
if b.Meta().MaxTime > globalMaxt {
|
|
|
|
globalMaxt = b.Meta().MaxTime
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
indexr, err := b.Index()
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "open index reader for block %+v", b.Meta())
|
|
|
|
}
|
|
|
|
closers = append(closers, indexr)
|
|
|
|
|
|
|
|
chunkr, err := b.Chunks()
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta())
|
|
|
|
}
|
|
|
|
closers = append(closers, chunkr)
|
|
|
|
|
|
|
|
tombsr, err := b.Tombstones()
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta())
|
|
|
|
}
|
|
|
|
closers = append(closers, tombsr)
|
|
|
|
|
Reduce memory used by postings offset table.
Rather than keeping the offset of each postings list, instead
keep the nth offset of the offset of the posting list. As postings
list offsets have always been sorted, we can then get to the closest
entry before the one we want an iterate forwards.
I haven't done much tuning on the 32 number, it was chosen to try
not to read through more than a 4k page of data.
Switch to a bulk interface for fetching postings. Use it to avoid having
to re-read parts of the posting offset table when querying lots of it.
For a index with what BenchmarkHeadPostingForMatchers uses RAM
for r.postings drops from 3.79MB to 80.19kB or about 48x.
Bytes allocated go down by 30%, and suprisingly CPU usage drops by
4-6% for typical queries too.
benchmark old ns/op new ns/op delta
BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09%
BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04%
BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48%
BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42%
BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05%
BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56%
BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70%
BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89%
BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72%
BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64%
BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11%
benchmark old allocs new allocs delta
BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00%
BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14%
BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14%
BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33%
BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00%
BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01%
BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01%
BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01%
BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01%
benchmark old bytes new bytes delta
BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12%
BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78%
BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78%
BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15%
BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00%
BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69%
BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69%
BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00%
BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01%
BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69%
BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69%
BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69%
BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88%
Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
5 years ago
|
|
|
k, v := index.AllPostingsKey()
|
|
|
|
all, err := indexr.Postings(ctx, k, v)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
all = indexr.SortedPostings(all)
|
|
|
|
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
|
|
|
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
|
Stream symbols during compaction. (#6468)
Rather than buffer up symbols in RAM, do it one by one
during compaction. Then use the reader's symbol handling
for symbol lookups during the rest of the index write.
There is some slowdown in compaction, due to having to look through a file
rather than a hash lookup. This is noise to the overall cost of compacting
series with thousands of samples though.
benchmark old ns/op new ns/op delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35%
benchmark old allocs new allocs delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59%
benchmark old bytes new bytes delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60%
Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
5 years ago
|
|
|
syms := indexr.Symbols()
|
|
|
|
if i == 0 {
|
Stream symbols during compaction. (#6468)
Rather than buffer up symbols in RAM, do it one by one
during compaction. Then use the reader's symbol handling
for symbol lookups during the rest of the index write.
There is some slowdown in compaction, due to having to look through a file
rather than a hash lookup. This is noise to the overall cost of compacting
series with thousands of samples though.
benchmark old ns/op new ns/op delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35%
benchmark old allocs new allocs delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59%
benchmark old bytes new bytes delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60%
Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
5 years ago
|
|
|
symbols = syms
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
symbols = NewMergedStringIter(symbols, syms)
|
|
|
|
}
|
|
|
|
|
Stream symbols during compaction. (#6468)
Rather than buffer up symbols in RAM, do it one by one
during compaction. Then use the reader's symbol handling
for symbol lookups during the rest of the index write.
There is some slowdown in compaction, due to having to look through a file
rather than a hash lookup. This is noise to the overall cost of compacting
series with thousands of samples though.
benchmark old ns/op new ns/op delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35%
benchmark old allocs new allocs delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59%
benchmark old bytes new bytes delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60%
Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
5 years ago
|
|
|
for symbols.Next() {
|
|
|
|
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
|
|
|
return errors.Wrap(err, "add symbol")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if symbols.Err() != nil {
|
|
|
|
return errors.Wrap(symbols.Err(), "next symbol")
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
ref = storage.SeriesRef(0)
|
|
|
|
chks []chunks.Meta
|
|
|
|
chksIter chunks.Iterator
|
|
|
|
)
|
|
|
|
|
|
|
|
set := sets[0]
|
|
|
|
if len(sets) > 1 {
|
|
|
|
// Merge series using specified chunk series merger.
|
|
|
|
// The default one is the compacting series merger.
|
|
|
|
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterate over all sorted chunk series.
|
|
|
|
for set.Next() {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
s := set.At()
|
|
|
|
chksIter = s.Iterator(chksIter)
|
|
|
|
chks = chks[:0]
|
|
|
|
for chksIter.Next() {
|
|
|
|
// We are not iterating in streaming way over chunk as
|
|
|
|
// it's more efficient to do bulk write for index and
|
|
|
|
// chunk file purposes.
|
|
|
|
chks = append(chks, chksIter.At())
|
|
|
|
}
|
|
|
|
if chksIter.Err() != nil {
|
|
|
|
return errors.Wrap(chksIter.Err(), "chunk iter")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Skip the series with all deleted chunks.
|
|
|
|
if len(chks) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := chunkw.WriteChunks(chks...); err != nil {
|
|
|
|
return errors.Wrap(err, "write chunks")
|
|
|
|
}
|
|
|
|
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
|
|
|
|
return errors.Wrap(err, "add series")
|
|
|
|
}
|
|
|
|
|
|
|
|
meta.Stats.NumChunks += uint64(len(chks))
|
|
|
|
meta.Stats.NumSeries++
|
|
|
|
for _, chk := range chks {
|
|
|
|
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, chk := range chks {
|
|
|
|
if err := chunkPool.Put(chk.Chunk); err != nil {
|
|
|
|
return errors.Wrap(err, "put chunk")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ref++
|
|
|
|
}
|
|
|
|
if set.Err() != nil {
|
|
|
|
return errors.Wrap(set.Err(), "iterate compaction set")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|