diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 97280ffe4..dbe92f3df 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -113,8 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds - MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), }) if err != nil { exitWithError(err) diff --git a/compact.go b/compact.go index 7799b640f..6158f2d67 100644 --- a/compact.go +++ b/compact.go @@ -30,6 +30,18 @@ import ( "github.com/prometheus/tsdb/labels" ) +// ExponentialBlockRanges returns the time ranges based on the stepSize +func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { + ranges := make([]int64, 0, steps) + curRange := minSize + for i := 0; i < steps; i++ { + ranges = append(ranges, curRange) + curRange = curRange * int64(stepSize) + } + + return ranges +} + // Compactor provides compaction against an underlying storage // of time series data. type Compactor interface { @@ -87,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type compactorOptions struct { - maxBlockRange uint64 + blockRanges []int64 } func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { @@ -133,37 +145,90 @@ func (c *compactor) Plan() ([][]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) - if len(dms) == 0 { + if len(dms) <= 1 { return nil, nil } - sliceDirs := func(i, j int) [][]string { + sliceDirs := func(dms []dirMeta) [][]string { + if len(dms) == 0 { + return nil + } var res []string - for k := i; k < j; k++ { - res = append(res, dms[k].dir) + for _, dm := range dms { + res = append(res, dm.dir) } return [][]string{res} } - // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(dms)-compactionBlocksLen+1; i++ { - if c.match(dms[i : i+3]) { - return sliceDirs(i, i+compactionBlocksLen), nil + return sliceDirs(c.selectDirs(dms)), nil +} + +func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { + // The way to skip compaction is to not have blockRanges. + if len(c.opts.blockRanges) == 1 { + return nil + } + + return selectRecurse(ds, c.opts.blockRanges) +} + +func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta { + if len(intervals) == 0 { + return dms + } + + // Get the blocks by the max interval + blocks := splitByRange(dms, intervals[len(intervals)-1]) + dirs := []dirMeta{} + for i := len(blocks) - 1; i >= 0; i-- { + // We need to choose the oldest blocks to compact. If there are a couple of blocks in + // the largest interval, we should compact those first. + if len(blocks[i]) > 1 { + dirs = blocks[i] + break + } + } + + // If there are too many blocks, see if a smaller interval will catch them. + // i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100 + // than all at once. + // Again if have 0-1d, 1d-2d, 3-6d we compact 0-1d, 1d-2d to compact it into the 0-3d block instead of compacting all three + // This is to honor the boundaries as much as possible. + if len(dirs) > 2 { + smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1]) + if len(smallerDirs) > 1 { + return smallerDirs } } - return nil, nil + return dirs } -func (c *compactor) match(dirs []dirMeta) bool { - g := dirs[0].meta.Compaction.Generation +// splitByRange splits the directories by the time range. +// for example if we have blocks 0-10, 10-20, 50-60, 90-100 and want to split them into 30 interval ranges +// splitByRange returns [0-10, 10-20], [50-60], [90-100]. +func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { + var splitDirs [][]dirMeta + + for i := 0; i < len(ds); { + var group []dirMeta + // Compute start of aligned time range of size tr closest to the current block's start. + t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr) + + // Add all dirs to the current group that are within [t0, t0+tr]. + for ; i < len(ds); i++ { + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + break + } + group = append(group, ds[i]) + } - for _, d := range dirs { - if d.meta.Compaction.Generation != g { - return false + if len(group) > 0 { + splitDirs = append(splitDirs, group) } } - return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange + + return splitDirs } func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { diff --git a/compact_test.go b/compact_test.go new file mode 100644 index 000000000..07d3f6d63 --- /dev/null +++ b/compact_test.go @@ -0,0 +1,245 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompactionSelect(t *testing.T) { + opts := &compactorOptions{ + blockRanges: []int64{ + 20, + 60, + 240, + 720, + 2160, + }, + } + + type dirMetaSimple struct { + dir string + tr []int64 + } + + cases := []struct { + blocks []dirMetaSimple + planned [][]string + }{ + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + }, + planned: nil, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + }, + planned: [][]string{{"1", "2", "3"}}, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + { + dir: "4", + tr: []int64{60, 120}, + }, + { + dir: "5", + tr: []int64{120, 180}, + }, + }, + planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240 + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + { + dir: "4", + tr: []int64{60, 120}, + }, + { + dir: "5", + tr: []int64{120, 180}, + }, + { + dir: "6", + tr: []int64{720, 960}, + }, + { + dir: "7", + tr: []int64{1200, 1440}, + }, + }, + planned: [][]string{{"6", "7"}}, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{60, 80}, + }, + { + dir: "3", + tr: []int64{80, 100}, + }, + }, + planned: [][]string{{"2", "3"}}, + }, + } + + c := &compactor{ + opts: opts, + } + sliceDirs := func(dms []dirMeta) [][]string { + if len(dms) == 0 { + return nil + } + var res []string + for _, dm := range dms { + res = append(res, dm.dir) + } + return [][]string{res} + } + + dmFromSimple := func(dms []dirMetaSimple) []dirMeta { + dirs := make([]dirMeta, 0, len(dms)) + for _, dir := range dms { + dirs = append(dirs, dirMeta{ + dir: dir.dir, + meta: &BlockMeta{ + MinTime: dir.tr[0], + MaxTime: dir.tr[1], + }, + }) + } + + return dirs + } + + for _, tc := range cases { + require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks)))) + } +} + +func TestSplitByRange(t *testing.T) { + splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta { + rMap := make(map[int64][]dirMeta) + for _, dir := range ds { + t0 := dir.meta.MinTime - dir.meta.MinTime%tr + if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { + rMap[t0] = append(rMap[t0], dir) + } + } + res := make([][]dirMeta, 0, len(rMap)) + for _, v := range rMap { + res = append(res, v) + } + + sort.Slice(res, func(i, j int) bool { + return res[i][0].meta.MinTime < res[j][0].meta.MinTime + }) + + return res + } + + cases := []struct { + trange int64 + ranges [][]int64 + output [][][]int64 + }{ + { + trange: 60, + ranges: [][]int64{{0, 10}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 60}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {30, 60}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {60, 90}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}}, + }, + } + + for _, c := range cases { + blocks := make([]dirMeta, 0, len(c.ranges)) + for _, r := range c.ranges { + blocks = append(blocks, dirMeta{ + meta: &BlockMeta{ + MinTime: r[0], + MaxTime: r[1], + }, + }) + } + + require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange)) + } +} diff --git a/db.go b/db.go index a4801bbf0..2204de6c9 100644 --- a/db.go +++ b/db.go @@ -45,8 +45,7 @@ import ( var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds + BlockRanges: ExponentialBlockRanges(int64(2*time.Hour), 3, 5), NoLockfile: false, } @@ -58,12 +57,8 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 - // The timestamp range of head blocks after which they get persisted. - // It's the minimum duration of any persisted block. - MinBlockDuration uint64 - - // The maximum timestamp range of compacted blocks. - MaxBlockDuration uint64 + // The sizes of the Blocks. + BlockRanges []int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool @@ -227,9 +222,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(dir, r, l, &compactorOptions{ - maxBlockRange: opts.MaxBlockDuration, - }) + copts := &compactorOptions{ + blockRanges: opts.BlockRanges, + } + + if len(copts.blockRanges) == 0 { + return nil, errors.New("at least one block-range must exist") + } + + for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 { + if len(copts.blockRanges) == 1 { + break + } + + // Max overflow is restricted to 20%. + copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] + } + + db.compactor = newCompactor(dir, r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -699,20 +709,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { var ( - mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0])) addBuffer = len(db.blocks) == 0 last BlockMeta ) if !addBuffer { last = db.blocks[len(db.blocks)-1].Meta() - addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) + addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0]) } // Create another block of buffer in front if the DB is initialized or retrieving // new data after a long gap. // This ensures we always have a full block width of append window. if addBuffer { - if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { + if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil { return err } // If the previous block reaches into our new window, make it smaller.