diff --git a/.travis.yml b/.travis.yml index 78fe6858f..c5012a34a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,10 @@ sudo: required dist: trusty language: go +os: + - windows + - linux + - osx go: - 1.10.x @@ -9,9 +13,12 @@ go: go_import_path: github.com/prometheus/tsdb +before_install: + - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi + install: - - go get -v -t ./... + - make deps script: # `staticcheck` target is omitted due to linting errors - - make check_license style unused test + - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ec9f9eba..6a0560697 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,25 @@ - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. -## 0.3.0 +## 0.4.0 + - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. + - [REMOVED] `PrefixMatcher` is considered unused so was removed. + - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. + - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. +## 0.3.1 + - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. + +## 0.3.0 - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. - [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct. - [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors. - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. + -[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. diff --git a/Makefile b/Makefile index eff204cd2..3458e7fa4 100644 --- a/Makefile +++ b/Makefile @@ -18,11 +18,15 @@ TSDB_BENCHMARK_NUM_METRICS ?= 1000 TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json" TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout" -STATICCHECK_IGNORE = include Makefile.common +.PHONY: deps +deps: + @echo ">> getting dependencies" + GO111MODULE=$(GO111MODULE) $(GO) get $(GOOPTS) -t ./... + build: - @$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR) + GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR) bench: build @echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)" diff --git a/block.go b/block.go index e5a66bd9e..42e11d951 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -182,6 +191,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Indicates that during compaction it resulted in a block without any samples + // so it should be deleted on the next reload. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` @@ -257,7 +269,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index 661898b7f..dec044918 100644 --- a/block_test.go +++ b/block_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/go-kit/kit/log" - "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -47,60 +46,40 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2}) - + blockDir := createBlock(t, tmpdir, 1, 0, 0) + b, err := OpenBlock(nil, blockDir, nil) + testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(tmpdir, nil) + b, err = OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) + testutil.Ok(t, b.Close()) } -// createEmpty block creates a block with the given meta but without any data. -func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { - testutil.Ok(t, os.MkdirAll(dir, 0777)) - - testutil.Ok(t, writeMetaFile(dir, meta)) - - ir, err := index.NewWriter(filepath.Join(dir, indexFilename)) - testutil.Ok(t, err) - testutil.Ok(t, ir.Close()) - - testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) - - testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones())) - - b, err := OpenBlock(dir, nil) - testutil.Ok(t, err) - return b -} - -// createPopulatedBlock creates a block with nSeries series, filled with -// samples of the given mint,maxt time range. -func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block { +// createBlock creates a block with nSeries series, filled with +// samples of the given mint,maxt time range and returns its dir. +func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) testutil.Ok(tb, err) - refs := make([]uint64, nSeries) + var ref uint64 for ts := mint; ts <= maxt; ts++ { app := head.Appender() - for i, lbl := range lbls { - if refs[i] != 0 { - err := app.AddFast(refs[i], ts, rand.Float64()) - if err == nil { - continue - } + for _, lbl := range lbls { + err := app.AddFast(ref, ts, rand.Float64()) + if err == nil { + continue } - ref, err := app.Add(lbl, int64(ts), rand.Float64()) + ref, err = app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) - refs[i] = ref } err := app.Commit() testutil.Ok(tb, err) @@ -113,8 +92,5 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - - blk, err := OpenBlock(filepath.Join(dir, ulid.String()), nil) - testutil.Ok(tb, err) - return blk + return filepath.Join(dir, ulid.String()) } diff --git a/checkpoint.go b/checkpoint.go index d1cad4c10..1c6239232 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + r := wal.NewReader(sgmReader) var ( diff --git a/checkpoint_test.go b/checkpoint_test.go index 76c486a7d..8b13c152a 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,11 +15,14 @@ package tsdb import ( + "fmt" "io/ioutil" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -31,11 +34,11 @@ func TestLastCheckpoint(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - s, k, err := LastCheckpoint(dir) + _, _, err = LastCheckpoint(dir) testutil.Equals(t, ErrNotFound, err) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) - s, k, err = LastCheckpoint(dir) + s, k, err := LastCheckpoint(dir) testutil.Ok(t, err) testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s) testutil.Equals(t, 0, k) @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) { {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, }, series) } + +func TestCheckpointNoTmpFolderAfterError(t *testing.T) { + // Create a new wal with an invalid records. + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + w, err := wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + testutil.Ok(t, w.Log([]byte{99})) + w.Close() + + // Run the checkpoint and since the wal contains an invalid records this should return an error. + _, err = Checkpoint(w, 0, 1, nil, 0) + testutil.NotOk(t, err) + + // Walk the wal dir to make sure there are no tmp folder left behind after the error. + err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrapf(err, "access err %q: %v\n", path, err) + } + if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { + return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) + } + return nil + }) + testutil.Ok(t, err) +} diff --git a/chunks/chunks.go b/chunks/chunks.go index 5eab23982..8fb288384 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -205,6 +205,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error { for _, c := range chks { maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += int64(len(c.Chunk.Bytes())) + maxLen += 4 // The 4 bytes of crc32 } newsz := w.n + maxLen @@ -284,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -304,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -327,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -345,6 +347,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e51a49952..e319b490d 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -33,6 +33,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "gopkg.in/alecthomas/kingpin.v2" ) @@ -48,6 +49,10 @@ func main() { listCmd = cli.Command("ls", "list db blocks") listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String() + analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") + analyzePath = analyzeCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String() + analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() + analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() ) switch kingpin.MustParse(cli.Parse(os.Args[1:])) { @@ -64,6 +69,27 @@ func main() { exitWithError(err) } printBlocks(db.Blocks(), listCmdHumanReadable) + case analyzeCmd.FullCommand(): + db, err := tsdb.Open(*analyzePath, nil, nil, nil) + if err != nil { + exitWithError(err) + } + blocks := db.Blocks() + var block *tsdb.Block + if *analyzeBlockID != "" { + for _, b := range blocks { + if b.Meta().ULID.String() == *analyzeBlockID { + block = b + break + } + } + } else if len(blocks) > 0 { + block = blocks[len(blocks)-1] + } + if block == nil { + exitWithError(fmt.Errorf("Block not found")) + } + analyzeBlock(block, *analyzeLimit) } flag.CommandLine.Set("log.level", "debug") } @@ -104,7 +130,6 @@ func (b *writeBenchmark) run() { l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ - WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), }) @@ -113,7 +138,7 @@ func (b *writeBenchmark) run() { } b.storage = st - var metrics []labels.Labels + var labels []labels.Labels measureTime("readData", func() { f, err := os.Open(b.samplesFile) @@ -122,7 +147,7 @@ func (b *writeBenchmark) run() { } defer f.Close() - metrics, err = readPrometheusLabels(f, b.numMetrics) + labels, err = readPrometheusLabels(f, b.numMetrics) if err != nil { exitWithError(err) } @@ -132,7 +157,7 @@ func (b *writeBenchmark) run() { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(labels, 3000) if err != nil { exitWithError(err) } @@ -188,7 +213,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u return total, nil } -func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { +func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount int, baset int64) (uint64, error) { ts := baset type sample struct { @@ -197,9 +222,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount ref *uint64 } - scrape := make([]*sample, 0, len(metrics)) + scrape := make([]*sample, 0, len(lbls)) - for _, m := range metrics { + for _, m := range lbls { scrape = append(scrape, &sample{ labels: m, value: 123456789, @@ -383,3 +408,132 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { } return strconv.FormatInt(timestamp, 10) } + +func analyzeBlock(b *tsdb.Block, limit int) { + fmt.Printf("Block path: %s\n", b.Dir()) + meta := b.Meta() + // Presume 1ms resolution that Prometheus uses. + fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) + fmt.Printf("Series: %d\n", meta.Stats.NumSeries) + ir, err := b.Index() + if err != nil { + exitWithError(err) + } + defer ir.Close() + + allLabelNames, err := ir.LabelNames() + if err != nil { + exitWithError(err) + } + fmt.Printf("Label names: %d\n", len(allLabelNames)) + + type postingInfo struct { + key string + metric uint64 + } + postingInfos := []postingInfo{} + + printInfo := func(postingInfos []postingInfo) { + sort.Slice(postingInfos, func(i, j int) bool { return postingInfos[i].metric > postingInfos[j].metric }) + + for i, pc := range postingInfos { + fmt.Printf("%d %s\n", pc.metric, pc.key) + if i >= limit { + break + } + } + } + + labelsUncovered := map[string]uint64{} + labelpairsUncovered := map[string]uint64{} + labelpairsCount := map[string]uint64{} + entries := 0 + p, err := ir.Postings("", "") // The special all key. + if err != nil { + exitWithError(err) + } + lbls := labels.Labels{} + chks := []chunks.Meta{} + for p.Next() { + if err = ir.Series(p.At(), &lbls, &chks); err != nil { + exitWithError(err) + } + // Amount of the block time range not covered by this series. + uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime) + for _, lbl := range lbls { + key := lbl.Name + "=" + lbl.Value + labelsUncovered[lbl.Name] += uncovered + labelpairsUncovered[key] += uncovered + labelpairsCount[key]++ + entries++ + } + } + if p.Err() != nil { + exitWithError(p.Err()) + } + fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered)) + fmt.Printf("Postings entries (total label pairs): %d\n", entries) + + postingInfos = postingInfos[:0] + for k, m := range labelpairsUncovered { + postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))}) + } + + fmt.Printf("\nLabel pairs most involved in churning:\n") + printInfo(postingInfos) + + postingInfos = postingInfos[:0] + for k, m := range labelsUncovered { + postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))}) + } + + fmt.Printf("\nLabel names most involved in churning:\n") + printInfo(postingInfos) + + postingInfos = postingInfos[:0] + for k, m := range labelpairsCount { + postingInfos = append(postingInfos, postingInfo{k, m}) + } + + fmt.Printf("\nMost common label pairs:\n") + printInfo(postingInfos) + + postingInfos = postingInfos[:0] + for _, n := range allLabelNames { + lv, err := ir.LabelValues(n) + if err != nil { + exitWithError(err) + } + postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())}) + } + fmt.Printf("\nHighest cardinality labels:\n") + printInfo(postingInfos) + + postingInfos = postingInfos[:0] + lv, err := ir.LabelValues("__name__") + if err != nil { + exitWithError(err) + } + for i := 0; i < lv.Len(); i++ { + names, err := lv.At(i) + if err != nil { + exitWithError(err) + } + for _, n := range names { + postings, err := ir.Postings("__name__", n) + if err != nil { + exitWithError(err) + } + count := 0 + for postings.Next() { + count++ + } + if postings.Err() != nil { + exitWithError(postings.Err()) + } + postingInfos = append(postingInfos, postingInfo{n, uint64(count)}) + } + } + fmt.Printf("\nHighest cardinality metric names:\n") + printInfo(postingInfos) +} diff --git a/compact.go b/compact.go index 556856107..dbc3c2606 100644 --- a/compact.go +++ b/compact.go @@ -56,12 +56,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -195,13 +200,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return res, nil } - // Compact any blocks that have >5% tombstones. + // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } @@ -356,7 +360,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } @@ -375,15 +379,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } return uid, nil } @@ -422,6 +445,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } @@ -516,10 +543,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe default: } - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to @@ -530,6 +553,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } } + // Populated block is empty, so cleanup and exit. + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") diff --git a/compact_test.go b/compact_test.go index 3b63db313..2080fb291 100644 --- a/compact_test.go +++ b/compact_test.go @@ -735,7 +735,7 @@ func TestDisableAutoCompactions(t *testing.T) { case db.compactc <- struct{}{}: default: } - for x := 0; x < 20; x++ { + for x := 0; x < 100; x++ { if len(db.Blocks()) > 0 { break } diff --git a/db.go b/db.go index ac9c832b9..c9715483d 100644 --- a/db.go +++ b/db.go @@ -45,7 +45,7 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestamps. var DefaultOptions = &Options{ - WALFlushInterval: 5 * time.Second, + WALSegmentSize: wal.DefaultSegmentSize, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), NoLockfile: false, @@ -53,12 +53,19 @@ var DefaultOptions = &Options{ // Options of the DB storage. type Options struct { - // The interval at which the write ahead log is flushed to disk. - WALFlushInterval time.Duration + // Segments (wal files) max size + WALSegmentSize int // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -131,11 +138,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -174,18 +182,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -201,6 +205,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -208,11 +220,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -269,7 +282,11 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db.compactCnl = cnl - wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) + segmentSize := wal.DefaultSegmentSize + if opts.WALSegmentSize > 0 { + segmentSize = opts.WALSegmentSize + } + wlog, err := wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) if err != nil { return nil, err } @@ -342,25 +359,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -425,7 +423,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -434,6 +433,14 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } runtime.GC() } @@ -476,8 +483,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -487,112 +493,193 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") + return err } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue - } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} - continue - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) - } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") } - // Load new blocks into memory. - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) - if !ok { - b, err = OpenBlock(dir, db.chunkPool) - if err != nil { - return errors.Wrapf(err, "open block %s", dir) - } - } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + bb = append(bb, block) + blocksSize += block.Size() + } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { + if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. + // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = blocks + db.blocks = loadable db.mtx.Unlock() - // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue - } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { - return errors.Wrapf(err, "delete obsolete block %s", ulid) - } + + if err := db.deleteBlocks(deletable); err != nil { + return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(blocks) == 0 { + if len(loadable) == 0 { return nil } - maxt := blocks[len(blocks)-1].Meta().MaxTime + + maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + + // See if we already have the block in memory or open it otherwise. + block, ok := db.getBlock(meta.ULID) + if !ok { + block, err = OpenBlock(db.logger, dir, db.chunkPool) + if err != nil { + corrupted[meta.ULID] = err + continue + } + } + blocks = append(blocks, block) + } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime + }) + + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block + } + + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } + + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break + } + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break + } + } + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) + } + } + return nil +} + // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { if len(bs) <= 1 { diff --git a/db_test.go b/db_test.go index 60b260765..21de405e2 100644 --- a/db_test.go +++ b/db_test.go @@ -81,22 +81,27 @@ func TestDB_reloadOrder(t *testing.T) { defer close() defer db.Close() - metas := []*BlockMeta{ - {ULID: ulid.MustNew(100, nil), MinTime: 90, MaxTime: 100}, - {ULID: ulid.MustNew(200, nil), MinTime: 70, MaxTime: 80}, - {ULID: ulid.MustNew(300, nil), MinTime: 100, MaxTime: 110}, + metas := []BlockMeta{ + {MinTime: 90, MaxTime: 100}, + {MinTime: 70, MaxTime: 80}, + {MinTime: 100, MaxTime: 110}, } for _, m := range metas { - bdir := filepath.Join(db.Dir(), m.ULID.String()) - createEmptyBlock(t, bdir, m) + createBlock(t, db.Dir(), 1, m.MinTime, m.MaxTime) } testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) - testutil.Equals(t, *metas[1], blocks[0].Meta()) - testutil.Equals(t, *metas[0], blocks[1].Meta()) - testutil.Equals(t, *metas[2], blocks[2].Meta()) + testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) + testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) + testutil.Equals(t, metas[0].MinTime, blocks[1].Meta().MinTime) + testutil.Equals(t, metas[0].MaxTime, blocks[1].Meta().MaxTime) + testutil.Equals(t, metas[2].MinTime, blocks[2].Meta().MinTime) + testutil.Equals(t, metas[2].MaxTime, blocks[2].Meta().MaxTime) } func TestDataAvailableOnlyAfterCommit(t *testing.T) { @@ -695,6 +700,32 @@ func TestWALFlushedOnDBClose(t *testing.T) { testutil.Equals(t, []string{"labelvalue"}, values) } +func TestWALSegmentSizeOption(t *testing.T) { + options := *DefaultOptions + options.WALSegmentSize = 2 * 32 * 1024 + db, close := openTestDB(t, &options) + defer close() + app := db.Appender() + for i := int64(0); i < 155; i++ { + _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + + dbDir := db.Dir() + db.Close() + files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) + testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") + testutil.Ok(t, err) + for i, f := range files { + if len(files)-1 != i { + testutil.Equals(t, int64(options.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) + continue + } + testutil.Assert(t, int64(options.WALSegmentSize) > f.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", f.Name()) + } +} + func TestTombstoneClean(t *testing.T) { numSamples := int64(10) @@ -796,6 +827,7 @@ func TestTombstoneClean(t *testing.T) { func TestTombstoneCleanFail(t *testing.T) { db, close := openTestDB(t, nil) + defer db.Close() defer close() var expectedBlockDirs []string @@ -804,15 +836,9 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) - meta := &BlockMeta{ - Version: 2, - ULID: uid, - } - blockDir := filepath.Join(db.Dir(), uid.String()) - block := createEmptyBlock(t, blockDir, meta) - + blockDir := createBlock(t, db.Dir(), 1, 0, 0) + block, err := OpenBlock(nil, blockDir, nil) + testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() tomb.addInterval(0, Interval{0, 1}) @@ -854,14 +880,8 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) - meta := &BlockMeta{ - Version: 2, - ULID: uid, - } - - block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) + testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -884,59 +904,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) - - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) - - testutil.Equals(t, 1, len(db.blocks)) - - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) - - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, }) - testutil.Ok(t, err) + defer close() defer db.Close() - testutil.Equals(t, 2, len(db.blocks)) + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - // Reload blocks, which should drop blocks beyond the retention boundary. + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } + + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} + +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } + + for _, m := range blocks { + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } + + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") + +} + +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { @@ -1260,12 +1319,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - id := ulid.MustNew(2000, nil) - createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ - ULID: id, - MinTime: 1000, - MaxTime: 2000, - }) + createBlock(t, dir, 1, 1000, 2000) db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) @@ -1278,12 +1332,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - id := ulid.MustNew(2000, nil) - createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ - ULID: id, - MinTime: 1000, - MaxTime: 6000, - }) + createBlock(t, dir, 1, 1000, 6000) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) w, err := wal.New(nil, nil, path.Join(dir, "wal")) @@ -1315,6 +1364,109 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } +func TestNoEmptyBlocks(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewMustRegexpMatcher("", ".*") + + t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { + testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") + }) + + t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + + app = db.Appender() + _, err = app.Add(defaultLabel, 1, 0) + testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + + // Adding new blocks. + currentTime := db.Head().MaxTime() + _, err = app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Ok(t, db.compact()) + testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") + }) + + t.Run(`When no new block is created from head, and there are some blocks on disk + compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { + oldBlocks := db.Blocks() + app := db.Appender() + currentTime := db.Head().MaxTime() + _, err := app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + testutil.Equals(t, oldBlocks, db.Blocks()) + }) + + t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { + currentTime := db.Head().MaxTime() + blocks := []*BlockMeta{ + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, + {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, + } + for _, m := range blocks { + createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) + } + + oldBlocks := db.Blocks() + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") + }) +} + func TestDB_LabelNames(t *testing.T) { tests := []struct { // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk -> @@ -1419,12 +1571,13 @@ func TestCorrectNumTombstones(t *testing.T) { defer db.Close() blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*blockRange+j, 0) testutil.Ok(t, err) } } @@ -1434,17 +1587,17 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 1, len(db.blocks)) - testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } @@ -1470,7 +1623,7 @@ func TestBlockRanges(t *testing.T) { // Test that the compactor doesn't create overlapping blocks // when a non standard block already exists. firstBlockMaxT := int64(3) - createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT) + createBlock(t, dir, 1, 0, firstBlockMaxT) db, err := Open(dir, logger, nil, DefaultOptions) if err != nil { t.Fatalf("Opening test storage failed: %s", err) @@ -1493,7 +1646,7 @@ func TestBlockRanges(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - for x := 1; x < 10; x++ { + for x := 0; x < 100; x++ { if len(db.Blocks()) == 2 { break } @@ -1515,11 +1668,12 @@ func TestBlockRanges(t *testing.T) { _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) testutil.Ok(t, err) _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) + testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.Close()) thirdBlockMaxt := secondBlockMaxt + 2 - createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) + createBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) db, err = Open(dir, logger, nil, DefaultOptions) if err != nil { @@ -1533,7 +1687,7 @@ func TestBlockRanges(t *testing.T) { _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - for x := 1; x < 10; x++ { + for x := 0; x < 100; x++ { if len(db.Blocks()) == 4 { break } diff --git a/head.go b/head.go index 52a3dfe46..cbc8661f8 100644 --- a/head.go +++ b/head.go @@ -89,6 +89,7 @@ type headMetrics struct { maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter headTruncateFail prometheus.Counter headTruncateTotal prometheus.Counter checkpointDeleteFail prometheus.Counter @@ -152,6 +153,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_wal_truncate_duration_seconds", Help: "Duration of WAL truncation.", }) + m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", @@ -195,6 +200,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.maxTime, m.gcDuration, m.walTruncateDuration, + m.walCorruptionsTotal, m.samplesAppended, m.headTruncateFail, m.headTruncateTotal, @@ -473,18 +479,17 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return errors.Wrap(err, "open WAL segments") } - defer sr.Close() err = h.loadWAL(wal.NewReader(sr)) + sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. if err == nil { return nil } level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) - + h.metrics.walCorruptionsTotal.Inc() if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } - return nil } @@ -501,7 +506,7 @@ func (h *Head) Truncate(mint int64) (err error) { return nil } atomic.StoreInt64(&h.minTime, mint) - h.minValidTime = mint + atomic.StoreInt64(&h.minValidTime, mint) // Ensure that max time is at least as high as min time. for h.MaxTime() < mint { @@ -572,7 +577,7 @@ func (h *Head) Truncate(mint int64) (err error) { } // initTime initializes a head with the first timestamp. This only needs to be called -// for a compltely fresh head with an empty WAL. +// for a completely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { @@ -657,7 +662,7 @@ func (h *Head) appender() *headAppender { head: h, // Set the minimum valid time to whichever is greater the head min valid time or the compaciton window. // This ensures that no samples will be added within the compaction window to avoid races. - minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2), + minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2), mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), @@ -680,6 +685,7 @@ func (h *Head) getAppendBuffer() []RefSample { } func (h *Head) putAppendBuffer(b []RefSample) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.appendPool.Put(b[:0]) } @@ -692,6 +698,7 @@ func (h *Head) getBytesBuffer() []byte { } func (h *Head) putBytesBuffer(b []byte) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.bytesPool.Put(b[:0]) } @@ -1089,25 +1096,30 @@ func (h *headIndexReader) Postings(name, value string) (index.Postings, error) { } func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { - ep := make([]uint64, 0, 128) + series := make([]*memSeries, 0, 128) + // Fetch all the series only once. for p.Next() { - ep = append(ep, p.At()) + s := h.head.series.getByID(p.At()) + if s == nil { + level.Debug(h.head.logger).Log("msg", "looked up series not found") + } else { + series = append(series, s) + } } if err := p.Err(); err != nil { return index.ErrPostings(errors.Wrap(err, "expand postings")) } - sort.Slice(ep, func(i, j int) bool { - a := h.head.series.getByID(ep[i]) - b := h.head.series.getByID(ep[j]) - - if a == nil || b == nil { - level.Debug(h.head.logger).Log("msg", "looked up series not found") - return false - } - return labels.Compare(a.lset, b.lset) < 0 + sort.Slice(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 }) + + // Convert back to list. + ep := make([]uint64, 0, len(series)) + for _, p := range series { + ep = append(ep, p.ref) + } return index.NewListPostings(ep) } diff --git a/head_bench_test.go b/head_bench_test.go index 4b80193b2..b511667f5 100644 --- a/head_bench_test.go +++ b/head_bench_test.go @@ -57,7 +57,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { defer h.Close() // TODO: vary number of series - for i := 0; i < 100; i++ { + for i := 0; i < 1000000; i++ { h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) } diff --git a/head_test.go b/head_test.go index 70873c7a5..8781f677a 100644 --- a/head_test.go +++ b/head_test.go @@ -22,6 +22,7 @@ import ( "sort" "testing" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" @@ -118,11 +119,11 @@ func TestHead_ReadWAL(t *testing.T) { w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) + defer w.Close() populateTestWAL(t, w, entries) head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) - defer head.Close() testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(100), head.lastSeriesID) @@ -282,11 +283,11 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) + defer w.Close() populateTestWAL(t, w, entries) head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) - defer head.Close() testutil.Ok(t, head.Init(math.MinInt64)) @@ -390,6 +391,7 @@ func TestDeleteUntilCurMax(t *testing.T) { numSamples := int64(10) hb, err := NewHead(nil, nil, nil, 1000000) testutil.Ok(t, err) + defer hb.Close() app := hb.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -478,6 +480,7 @@ func TestDelete_e2e(t *testing.T) { defer os.RemoveAll(dir) hb, err := NewHead(nil, nil, nil, 100000) testutil.Ok(t, err) + defer hb.Close() app := hb.Appender() for _, l := range lbls { ls := labels.New(l...) @@ -674,7 +677,7 @@ func TestMemSeries_append(t *testing.T) { ok, chunkCreated = s.append(1000, 3) testutil.Assert(t, ok, "append failed") - testutil.Assert(t, ok, "expected new chunk on boundary") + testutil.Assert(t, chunkCreated, "expected new chunk on boundary") ok, chunkCreated = s.append(1001, 4) testutil.Assert(t, ok, "append failed") @@ -845,6 +848,7 @@ func TestHead_LogRollback(t *testing.T) { w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) + defer w.Close() h, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) @@ -911,6 +915,7 @@ func TestWalRepair(t *testing.T) { w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) + defer w.Close() for i := 1; i <= test.totalRecs; i++ { // At this point insert a corrupted record. @@ -923,7 +928,9 @@ func TestWalRepair(t *testing.T) { h, err := NewHead(nil, nil, w, 1) testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) testutil.Ok(t, h.Init(math.MinInt64)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) sr, err := wal.NewSegmentsReader(dir) testutil.Ok(t, err) @@ -936,7 +943,6 @@ func TestWalRepair(t *testing.T) { } testutil.Ok(t, r.Err()) testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") - }) } } diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go index 602498f11..9104f1cb5 100644 --- a/index/encoding_helpers.go +++ b/index/encoding_helpers.go @@ -18,6 +18,8 @@ import ( "hash" "hash/crc32" "unsafe" + + "github.com/pkg/errors" ) // enbuf is a helper type to populate a byte slice with various types. @@ -86,6 +88,60 @@ type decbuf struct { e error } +// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. +func newDecbufAt(bs ByteSlice, off int) decbuf { + if bs.Len() < off+4 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if bs.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if bs.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} + } + + if bs.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec +} + func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } diff --git a/index/index.go b/index/index.go index 6413a9fca..74e08d465 100644 --- a/index/index.go +++ b/index/index.go @@ -20,6 +20,7 @@ import ( "hash" "hash/crc32" "io" + "io/ioutil" "math" "os" "path/filepath" @@ -35,9 +36,13 @@ import ( const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + // HeaderLen represents number of bytes reserved of index for header. + HeaderLen = 5 - indexFormatV1 = 1 - indexFormatV2 = 2 + // FormatV1 represents 1 version of index. + FormatV1 = 1 + // FormatV2 represents 2 version of index. + FormatV2 = 2 labelNameSeperator = "\xff" ) @@ -108,7 +113,7 @@ type Writer struct { fbuf *bufio.Writer pos uint64 - toc indexTOC + toc TOC stage indexWriterStage // Reusable memory. @@ -129,13 +134,42 @@ type Writer struct { Version int } -type indexTOC struct { - symbols uint64 - series uint64 - labelIndices uint64 - labelIndicesTable uint64 - postings uint64 - postingsTable uint64 +// TOC represents index Table Of Content that states where each section of index starts. +type TOC struct { + Symbols uint64 + Series uint64 + LabelIndices uint64 + LabelIndicesTable uint64 + Postings uint64 + PostingsTable uint64 +} + +// NewTOCFromByteSlice return parsed TOC from given index byte slice. +func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { + if bs.Len() < indexTOCLen { + return nil, errInvalidSize + } + b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return nil, errors.Wrap(errInvalidChecksum, "read TOC") + } + + if err := d.err(); err != nil { + return nil, err + } + + return &TOC{ + Symbols: d.be64(), + Series: d.be64(), + LabelIndices: d.be64(), + LabelIndicesTable: d.be64(), + Postings: d.be64(), + PostingsTable: d.be64(), + }, nil } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. @@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.symbols = w.pos + w.toc.Symbols = w.pos case idxStageSeries: - w.toc.series = w.pos + w.toc.Series = w.pos case idxStageLabelIndex: - w.toc.labelIndices = w.pos + w.toc.LabelIndices = w.pos case idxStagePostings: - w.toc.postings = w.pos + w.toc.Postings = w.pos case idxStageDone: - w.toc.labelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.pos if err := w.writeOffsetTable(w.labelIndexes); err != nil { return err } - w.toc.postingsTable = w.pos + w.toc.PostingsTable = w.pos if err := w.writeOffsetTable(w.postings); err != nil { return err } @@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.reset() w.buf1.putBE32(MagicIndex) - w.buf1.putByte(indexFormatV2) + w.buf1.putByte(FormatV2) return w.write(w.buf1.get()) } @@ -346,8 +380,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - const headerSize = 4 - w.buf1.reset() w.buf2.reset() @@ -438,12 +470,12 @@ const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { w.buf1.reset() - w.buf1.putBE64(w.toc.symbols) - w.buf1.putBE64(w.toc.series) - w.buf1.putBE64(w.toc.labelIndices) - w.buf1.putBE64(w.toc.labelIndicesTable) - w.buf1.putBE64(w.toc.postings) - w.buf1.putBE64(w.toc.postingsTable) + w.buf1.putBE64(w.toc.Symbols) + w.buf1.putBE64(w.toc.Series) + w.buf1.putBE64(w.toc.LabelIndices) + w.buf1.putBE64(w.toc.LabelIndicesTable) + w.buf1.putBE64(w.toc.Postings) + w.buf1.putBE64(w.toc.PostingsTable) w.buf1.putHash(w.crc32) @@ -535,15 +567,14 @@ type StringTuples interface { } type Reader struct { - // The underlying byte slice holding the encoded series data. - b ByteSlice - toc indexTOC + b ByteSlice // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. - labels map[string]uint64 + labels map[string]uint64 + // LabelName to LabelValue to offset map. postings map[string]map[string]uint64 // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than @@ -551,19 +582,17 @@ type Reader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. The older format has sparse indexes so a map // must be used, but the new format is not so we can use a slice. - symbols map[uint32]string - symbolSlice []string + symbolsV1 map[uint32]string + symbolsV2 []string + symbolsTableSize uint64 dec *Decoder - crc32 hash.Hash32 - version int } var ( errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") errInvalidChecksum = fmt.Errorf("invalid checksum") ) @@ -587,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewReader returns a new IndexReader on the given byte slice. It automatically +// NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil) + return newReader(b, ioutil.NopCloser(nil)) } // NewFileReader returns a new index reader against the given index file. @@ -606,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r := &Reader{ b: b, c: c, - symbols: map[uint32]string{}, labels: map[string]uint64{}, postings: map[string]map[string]uint64{}, - crc32: newCRC32(), } // Verify header. - if b.Len() < 5 { + if r.b.Len() < HeaderLen { return nil, errors.Wrap(errInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { @@ -621,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != 1 && r.version != 2 { + if r.version != FormatV1 && r.version != FormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } - if err := r.readTOC(); err != nil { + toc, err := NewTOCFromByteSlice(b) + if err != nil { return nil, errors.Wrap(err, "read TOC") } - if err := r.readSymbols(int(r.toc.symbols)); err != nil { + + r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols)) + if err != nil { return nil, errors.Wrap(err, "read symbols") } - var err error // Use the strings already allocated by symbols, rather than // re-allocating them again below. - symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice)) - for _, s := range r.symbols { - symbols[s] = s + // Additionally, calculate symbolsTableSize. + allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2)) + for _, s := range r.symbolsV1 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - for _, s := range r.symbolSlice { - symbols[s] = s + for _, s := range r.symbolsV2 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error { if len(key) != 1 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - r.labels[symbols[key[0]]] = off + + r.labels[allocatedSymbols[key[0]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read label index table") } + r.postings[""] = map[string]uint64{} - err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error { if len(key) != 2 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { - r.postings[symbols[key[0]]] = map[string]uint64{} + r.postings[allocatedSymbols[key[0]]] = map[string]uint64{} } - r.postings[key[0]][symbols[key[1]]] = off + r.postings[key[0]][allocatedSymbols[key[1]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read postings table") } - r.dec = &Decoder{lookupSymbol: r.lookupSymbol} + r.dec = &Decoder{LookupSymbol: r.lookupSymbol} return r, nil } @@ -690,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { for k, e := range r.postings { for v, start := range e { - d := r.decbufAt(int(start)) + d := newDecbufAt(r.b, int(start)) if d.err() != nil { return nil, d.err() } @@ -703,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { return m, nil } -func (r *Reader) readTOC() error { - if r.b.Len() < indexTOCLen { - return errInvalidSize - } - b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) - - expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := decbuf{b: b[:len(b)-4]} - - if d.crc32() != expCRC { - return errors.Wrap(errInvalidChecksum, "read TOC") - } - - r.toc.symbols = d.be64() - r.toc.series = d.be64() - r.toc.labelIndices = d.be64() - r.toc.labelIndicesTable = d.be64() - r.toc.postings = d.be64() - r.toc.postingsTable = d.be64() - - return d.err() -} - -// decbufAt returns a new decoding buffer. It expects the first 4 bytes -// after offset to hold the big endian encoded content length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufAt(off int) decbuf { - if r.b.Len() < off+4 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+4) - l := int(binary.BigEndian.Uint32(b)) - - if r.b.Len() < off+4+l+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+4, off+4+l+4) - dec := decbuf{b: b[:len(b)-4]} - - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// decbufUvarintAt returns a new decoding buffer. It expects the first bytes -// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufUvarintAt(off int) decbuf { - // We never have to access this method at the far end of the byte slice. Thus just checking - // against the MaxVarintLen32 is sufficient. - if r.b.Len() < off+binary.MaxVarintLen32 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+binary.MaxVarintLen32) - - l, n := binary.Uvarint(b) - if n <= 0 || n > binary.MaxVarintLen32 { - return decbuf{e: errors.Errorf("invalid uvarint %d", n)} - } - - if r.b.Len() < off+n+int(l)+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+n, off+n+int(l)+4) - dec := decbuf{b: b[:len(b)-4]} - - if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. -func (r *Reader) readSymbols(off int) error { +func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) { if off == 0 { - return nil + return nil, nil, nil } - d := r.decbufAt(off) + d := newDecbufAt(bs, off) var ( - origLen = d.len() - cnt = d.be32int() - basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.len()) + origLen = d.len() + cnt = d.be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.len()) + symbolSlice []string + symbols = map[uint32]string{} ) - if r.version == 2 { - r.symbolSlice = make([]string, 0, cnt) + if version == 2 { + symbolSlice = make([]string, 0, cnt) } for d.err() == nil && d.len() > 0 && cnt > 0 { s := d.uvarintStr() - if r.version == 2 { - r.symbolSlice = append(r.symbolSlice, s) + if version == FormatV2 { + symbolSlice = append(symbolSlice, s) } else { - r.symbols[nextPos] = s + symbols[nextPos] = s nextPos = basePos + uint32(origLen-d.len()) } cnt-- } - return errors.Wrap(d.err(), "read symbols") + return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols") } -// readOffsetTable reads an offset table at the given position calls f for each -// found entry.f -// If f returns an error it stops decoding and returns the received error, -func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error { - d := r.decbufAt(int(off)) +// ReadOffsetTable reads an offset table and at the given position calls f for each +// found entry. If f returns an error it stops decoding and returns the received error. +func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { + d := newDecbufAt(bs, int(off)) cnt := d.be32() for d.err() == nil && d.len() > 0 && cnt > 0 { @@ -845,10 +801,10 @@ func (r *Reader) Close() error { } func (r *Reader) lookupSymbol(o uint32) (string, error) { - if int(o) < len(r.symbolSlice) { - return r.symbolSlice[o], nil + if int(o) < len(r.symbolsV2) { + return r.symbolsV2[o], nil } - s, ok := r.symbols[o] + s, ok := r.symbolsV1[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) } @@ -857,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { // Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { - res := make(map[string]struct{}, len(r.symbols)) + res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2)) - for _, s := range r.symbols { + for _, s := range r.symbolsV1 { res[s] = struct{}{} } - for _, s := range r.symbolSlice { + for _, s := range r.symbolsV2 { res[s] = struct{}{} } return res, nil } -// SymbolTableSize returns the symbol table that is used to resolve symbol references. +// SymbolTableSize returns the symbol table size in bytes. func (r *Reader) SymbolTableSize() uint64 { - var size int - for _, s := range r.symbols { - size += len(s) + 8 - } - for _, s := range r.symbolSlice { - size += len(s) + 8 - } - return uint64(size) + return r.symbolsTableSize } // LabelValues returns value tuples that exist for the given label name tuples. @@ -892,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) nc := d.be32int() d.be32() // consume unused value entry count. @@ -916,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 } // LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { - res := [][]string{} + var res [][]string for s := range r.labels { res = append(res, strings.Split(s, labelNameSeperator)) } @@ -928,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == 2 { + if r.version == FormatV2 { offset = id * 16 } - d := r.decbufUvarintAt(int(offset)) + d := newDecbufUvarintAt(r.b, int(offset)) if d.err() != nil { return d.err() } @@ -948,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if !ok { return EmptyPostings(), nil } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) if d.err() != nil { return nil, errors.Wrap(d.err(), "get postings entry") } @@ -965,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) @@ -1062,7 +1016,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - lookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) } // Postings returns a postings list for b and its number of elements. @@ -1090,11 +1044,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e return errors.Wrap(d.err(), "read series label offsets") } - ln, err := dec.lookupSymbol(lno) + ln, err := dec.LookupSymbol(lno) if err != nil { return errors.Wrap(err, "lookup label name") } - lv, err := dec.lookupSymbol(lvo) + lv, err := dec.LookupSymbol(lvo) if err != nil { return errors.Wrap(err, "lookup label value") } diff --git a/index/index_test.go b/index/index_test.go index f7a815622..2edd3956a 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -339,6 +339,7 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, err) expp, err := mi.Postings(p.Name, p.Value) + testutil.Ok(t, err) var lset, explset labels.Labels var chks, expchks []chunks.Meta @@ -352,6 +353,7 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, err) err = mi.Series(expp.At(), &explset, &expchks) + testutil.Ok(t, err) testutil.Equals(t, explset, lset) testutil.Equals(t, expchks, chks) } @@ -378,13 +380,28 @@ func TestPersistence_index_e2e(t *testing.T) { } } + gotSymbols, err := ir.Symbols() + testutil.Ok(t, err) + + testutil.Equals(t, len(mi.symbols), len(gotSymbols)) + for s := range mi.symbols { + _, ok := gotSymbols[s] + testutil.Assert(t, ok, "") + } + testutil.Ok(t, ir.Close()) } +func TestDecbufUvariantWithInvalidBuffer(t *testing.T) { + b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) + + db := newDecbufUvarintAt(b, 0) + testutil.NotOk(t, db.err()) +} + func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - r := &Reader{b: b} - db := r.decbufUvarintAt(0) - testutil.NotOk(t, db.err()) + _, err := NewReader(b) + testutil.NotOk(t, err) } diff --git a/index/postings.go b/index/postings.go index 1f2ee0d6f..13df1c69a 100644 --- a/index/postings.go +++ b/index/postings.go @@ -366,80 +366,25 @@ func Merge(its ...Postings) Postings { if len(its) == 1 { return its[0] } - l := len(its) / 2 - return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) -} - -type mergedPostings struct { - a, b Postings - initialized bool - aok, bok bool - cur uint64 -} - -func newMergedPostings(a, b Postings) *mergedPostings { - return &mergedPostings{a: a, b: b} -} - -func (it *mergedPostings) At() uint64 { - return it.cur -} - -func (it *mergedPostings) Next() bool { - if !it.initialized { - it.aok = it.a.Next() - it.bok = it.b.Next() - it.initialized = true + // All the uses of this function immediately expand it, so + // collect everything in a map. This is more efficient + // when there's 100ks of postings, compared to + // having a tree of merge objects. + pm := make(map[uint64]struct{}, len(its)) + for _, it := range its { + for it.Next() { + pm[it.At()] = struct{}{} + } + if it.Err() != nil { + return ErrPostings(it.Err()) + } } - - if !it.aok && !it.bok { - return false + pl := make([]uint64, 0, len(pm)) + for p := range pm { + pl = append(pl, p) } - - if !it.aok { - it.cur = it.b.At() - it.bok = it.b.Next() - return true - } - if !it.bok { - it.cur = it.a.At() - it.aok = it.a.Next() - return true - } - - acur, bcur := it.a.At(), it.b.At() - - if acur < bcur { - it.cur = acur - it.aok = it.a.Next() - } else if acur > bcur { - it.cur = bcur - it.bok = it.b.Next() - } else { - it.cur = acur - it.aok = it.a.Next() - it.bok = it.b.Next() - } - return true -} - -func (it *mergedPostings) Seek(id uint64) bool { - if it.cur >= id { - return true - } - - it.aok = it.a.Seek(id) - it.bok = it.b.Seek(id) - it.initialized = true - - return it.Next() -} - -func (it *mergedPostings) Err() error { - if it.a.Err() != nil { - return it.a.Err() - } - return it.b.Err() + sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] }) + return newListPostings(pl) } // Without returns a new postings list that contains all elements from the full list that diff --git a/index/postings_test.go b/index/postings_test.go index 53a9d95fc..54c37f480 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -233,7 +233,7 @@ func TestMergedPostings(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - res, err := ExpandPostings(newMergedPostings(a, b)) + res, err := ExpandPostings(Merge(a, b)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) } @@ -286,7 +286,7 @@ func TestMergedPostingsSeek(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - p := newMergedPostings(a, b) + p := Merge(a, b) testutil.Equals(t, c.success, p.Seek(c.seek)) @@ -546,7 +546,7 @@ func TestIntersectWithMerge(t *testing.T) { // https://github.com/prometheus/prometheus/issues/2616 a := newListPostings([]uint64{21, 22, 23, 24, 25, 30}) - b := newMergedPostings( + b := Merge( newListPostings([]uint64{10, 20, 30}), newListPostings([]uint64{15, 26, 30}), ) diff --git a/labels/labels_test.go b/labels/labels_test.go index 5ad573ce9..c49f66edf 100644 --- a/labels/labels_test.go +++ b/labels/labels_test.go @@ -150,7 +150,7 @@ func BenchmarkMapFromLabels(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - m = ls.Map() + _ = ls.Map() } } diff --git a/labels/selector.go b/labels/selector.go index 7bc452faa..c0c74ed52 100644 --- a/labels/selector.go +++ b/labels/selector.go @@ -15,7 +15,6 @@ package labels import ( "regexp" - "strings" ) // Selector holds constraints for matching against a label set. @@ -99,22 +98,3 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) } func Not(m Matcher) Matcher { return ¬Matcher{m} } - -// PrefixMatcher implements Matcher for labels which values matches prefix. -type PrefixMatcher struct { - name, prefix string -} - -// NewPrefixMatcher returns new Matcher for label name matching prefix. -func NewPrefixMatcher(name, prefix string) Matcher { - return &PrefixMatcher{name: name, prefix: prefix} -} - -// Name implements Matcher interface. -func (m *PrefixMatcher) Name() string { return m.name } - -// Prefix returns matching prefix. -func (m *PrefixMatcher) Prefix() string { return m.prefix } - -// Matches implements Matcher interface. -func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) } diff --git a/querier.go b/querier.go index 7459c6bee..4a5a40636 100644 --- a/querier.go +++ b/querier.go @@ -247,37 +247,6 @@ func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, return ix.SortedPostings(index.Intersect(its...)), nil } -// tuplesByPrefix uses binary search to find prefix matches within ts. -func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) { - var outErr error - tslen := ts.Len() - i := sort.Search(tslen, func(i int) bool { - vs, err := ts.At(i) - if err != nil { - outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err) - return true - } - val := vs[0] - l := len(m.Prefix()) - if l > len(vs) { - l = len(val) - } - return val[:l] >= m.Prefix() - }) - if outErr != nil { - return nil, outErr - } - var matches []string - for ; i < tslen; i++ { - vs, err := ts.At(i) - if err != nil || !m.Matches(vs[0]) { - return matches, err - } - matches = append(matches, vs[0]) - } - return matches, nil -} - func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 @@ -301,21 +270,13 @@ func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error } var res []string - if pm, ok := m.(*labels.PrefixMatcher); ok { - res, err = tuplesByPrefix(pm, tpls) + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) if err != nil { return nil, err } - - } else { - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) - if err != nil { - return nil, err - } - if m.Matches(vals[0]) { - res = append(res, vals[0]) - } + if m.Matches(vals[0]) { + res = append(res, vals[0]) } } @@ -620,11 +581,9 @@ func (s *populatedChunkSeries) Next() bool { // This means that the chunk has be garbage collected. Remove it from the list. if s.err == ErrNotFound { s.err = nil - // Delete in-place. - chks = append(chks[:j], chks[j+1:]...) + s.chks = append(chks[:j], chks[j+1:]...) } - return false } } diff --git a/querier_test.go b/querier_test.go index e31072ce5..69ca84602 100644 --- a/querier_test.go +++ b/querier_test.go @@ -375,47 +375,6 @@ func TestBlockQuerier(t *testing.T) { }, }, }, - { - lset: map[string]string{ - "p": "abcd", - "x": "xyz", - }, - chunks: [][]sample{ - { - {1, 2}, {2, 3}, {3, 4}, - }, - { - {5, 2}, {6, 3}, {7, 4}, - }, - }, - }, - { - lset: map[string]string{ - "a": "ab", - "p": "abce", - }, - chunks: [][]sample{ - { - {1, 1}, {2, 2}, {3, 3}, - }, - { - {5, 3}, {6, 6}, - }, - }, - }, - { - lset: map[string]string{ - "p": "xyz", - }, - chunks: [][]sample{ - { - {1, 1}, {2, 2}, {3, 3}, - }, - { - {4, 4}, {5, 5}, {6, 6}, - }, - }, - }, }, queries: []query{ @@ -455,25 +414,6 @@ func TestBlockQuerier(t *testing.T) { ), }), }, - { - mint: 2, - maxt: 6, - ms: []labels.Matcher{labels.NewPrefixMatcher("p", "abc")}, - exp: newMockSeriesSet([]Series{ - newSeries(map[string]string{ - "a": "ab", - "p": "abce", - }, - []Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, - ), - newSeries(map[string]string{ - "p": "abcd", - "x": "xyz", - }, - []Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}}, - ), - }), - }, }, } @@ -1287,12 +1227,13 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)) + block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + testutil.Ok(b, err) defer block.Close() q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime) diff --git a/repair_test.go b/repair_test.go index b3bf0acba..5fb780a5b 100644 --- a/repair_test.go +++ b/repair_test.go @@ -65,7 +65,7 @@ func TestRepairBadIndexVersion(t *testing.T) { // Check the current db. // In its current state, lookups should fail with the fixed code. - meta, err := readMetaFile(dbDir) + _, err := readMetaFile(dbDir) testutil.NotOk(t, err) // Touch chunks dir in block. @@ -116,7 +116,7 @@ func TestRepairBadIndexVersion(t *testing.T) { {{"a", "2"}, {"b", "1"}}, }, res) - meta, err = readMetaFile(tmpDbDir) + meta, err := readMetaFile(tmpDbDir) testutil.Ok(t, err) testutil.Assert(t, meta.Version == 1, "unexpected meta version %d", meta.Version) } diff --git a/staticcheck.conf b/staticcheck.conf new file mode 100644 index 000000000..3266a2e29 --- /dev/null +++ b/staticcheck.conf @@ -0,0 +1,2 @@ +# Enable only "legacy" staticcheck verifications. +checks = [ "SA*" ] diff --git a/tombstones.go b/tombstones.go index a1f30b59c..078140406 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index e12574f11..2a106d705 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. diff --git a/wal.go b/wal.go index 684a2fa6a..60e1c5807 100644 --- a/wal.go +++ b/wal.go @@ -94,27 +94,6 @@ type WAL interface { Close() error } -// NopWAL is a WAL that does nothing. -func NopWAL() WAL { - return nopWAL{} -} - -type nopWAL struct{} - -func (nopWAL) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), -) error { - return nil -} -func (w nopWAL) Reader() WALReader { return w } -func (nopWAL) LogSeries([]RefSeries) error { return nil } -func (nopWAL) LogSamples([]RefSample) error { return nil } -func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } -func (nopWAL) Close() error { return nil } - // WALReader reads entries from a WAL. type WALReader interface { Read( @@ -909,16 +888,19 @@ func (r *walReader) Read( if seriesf != nil { seriesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. seriesPool.Put(v[:0]) case []RefSample: if samplesf != nil { samplesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. samplePool.Put(v[:0]) case []Stone: if deletesf != nil { deletesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. deletePool.Put(v[:0]) default: level.Error(r.logger).Log("msg", "unexpected data type") diff --git a/wal/wal.go b/wal/wal.go index 2ed2018c7..fd90eb90e 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -35,7 +35,7 @@ import ( ) const ( - defaultSegmentSize = 128 * 1024 * 1024 // 128 MB + DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB pageSize = 32 * 1024 // 32KB recordHeaderSize = 7 ) @@ -164,6 +164,7 @@ type WAL struct { page *page // active page stopc chan chan struct{} actorc chan func() + closed bool // To allow calling Close() more than once without blocking. fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -174,7 +175,7 @@ type WAL struct { // New returns a new WAL over the given directory. func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return NewSize(logger, reg, dir, defaultSegmentSize) + return NewSize(logger, reg, dir, DefaultSegmentSize) } // NewSize returns a new WAL over the given directory. @@ -298,9 +299,6 @@ func (w *WAL) Repair(origErr error) error { level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment) for _, s := range segs { - if s.index <= cerr.Segment { - continue - } if w.segment.i == s.index { // The active segment needs to be removed, // close it first (Windows!). Can be closed safely @@ -310,6 +308,9 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "close active segment") } } + if s.index <= cerr.Segment { + continue + } if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil { return errors.Wrapf(err, "delete segment:%v", s.index) } @@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() + if w.closed { + return nil + } + // Flush the last page and zero out all its remaining size. // We must not flush an empty page as it would falsely signal // the segment is done if we start writing to it again after opening. @@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } - + w.closed = true return nil } @@ -827,28 +832,13 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch r.curRecTyp { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record") - } - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record") - } - case recLast: - if i == 0 { - return errors.New("unexpected last record") - } - return nil - default: - return errors.Errorf("unexpected record type %d", r.curRecTyp) + if err := validateRecord(r.curRecTyp, i); err != nil { + return err } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + // Only increment i for non-zero records since we use it // to determine valid content record sequences. i++ @@ -899,6 +889,226 @@ func (r *Reader) Offset() int64 { return r.total } +// NewLiveReader returns a new live reader. +func NewLiveReader(r io.Reader) *LiveReader { + return &LiveReader{rdr: r} +} + +// Reader reads WAL records from an io.Reader. It buffers partial record data for +// the next read. +type LiveReader struct { + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. +} + +func (r *LiveReader) Err() error { + return r.err +} + +func (r *LiveReader) TotalRead() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() error { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return err +} + +// Shift the buffer up to the read index. +func (r *LiveReader) shiftBuffer() { + copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) + r.readIndex = 0 + r.writeIndex = copied +} + +// Next returns true if r.rec will contain a full record. +// False does not indicate that there will never be more data to +// read for the current io.Reader. +func (r *LiveReader) Next() bool { + for { + if r.buildRecord() { + return true + } + if r.err != nil && r.err != io.EOF { + return false + } + if r.readIndex == pageSize { + r.shiftBuffer() + } + if r.writeIndex != pageSize { + if err := r.fillBuffer(); err != nil { + // We expect to get EOF, since we're reading the segment file as it's being written. + if err != io.EOF { + r.err = err + } + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appeneded to +// LiveReader.rec +func (r *LiveReader) buildRecord() bool { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) + r.readIndex += n + r.total += int64(n) + if err != nil { + r.err = err + return false + } + + if temp == nil { + return false + } + + rt := recType(r.hdr[0]) + + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.err = err + r.index = 0 + return false + } + if rt == recLast || rt == recFull { + r.index = 0 + return true + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// TODO(callum) the EOF errors we're returning from this function should theoretically +// never happen, add a metric for them. +func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { + readIndex := 0 + header[0] = buf[0] + readIndex++ + total++ + + // The rest of this function is mostly from Reader.Next(). + typ := recType(header[0]) + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (total % pageSize) + if k == pageSize { + return nil, 1, nil // Initial 0 byte was last page byte. + } + + if k <= int64(len(buf)-readIndex) { + for _, v := range buf[readIndex : int64(readIndex)+k] { + readIndex++ + if v != 0 { + return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") + } + } + return nil, readIndex, nil + } + // Not enough bytes to read the rest of the page term rec. + // This theoretically should never happen, since we're only shifting the + // internal buffer of the live reader when we read to the end of page. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + if readIndex+recordHeaderSize-1 > len(buf) { + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) + readIndex += recordHeaderSize - 1 + total += int64(recordHeaderSize - 1) + var ( + length = binary.BigEndian.Uint16(header[1:]) + crc = binary.BigEndian.Uint32(header[3:]) + ) + readTo := int(length) + readIndex + if readTo > len(buf) { + if (readTo - readIndex) > pageSize { + return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) + } + // Not enough data to read all of the record data. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + recData := buf[readIndex:readTo] + readIndex += int(length) + total += int64(length) + + // TODO(callum) what should we do here, throw out the record? We should add a metric at least. + if c := crc32.Checksum(recData, castagnoliTable); c != crc { + return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + return recData, readIndex, nil +} + func min(i, j int) int { if i < j { return i diff --git a/wal/wal_test.go b/wal/wal_test.go index 10600352d..f95b21239 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -17,16 +17,107 @@ package wal import ( "bytes" "encoding/binary" + "fmt" "hash/crc32" + "io" "io/ioutil" "math/rand" "os" + "path" + "sync" "testing" + "time" - "github.com/pkg/errors" "github.com/prometheus/tsdb/testutil" ) +type record struct { + t recType + b []byte +} + +var data = make([]byte, 100000) +var testReaderCases = []struct { + t []record + exp [][]byte + fail bool +}{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, +} + func encodedRecord(t recType, b []byte) []byte { if t == recPageTerm { return append([]byte{0}, b...) @@ -40,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte { // TestReader feeds the reader a stream of encoded records with different types. func TestReader(t *testing.T) { - data := make([]byte, 100000) - _, err := rand.Read(data) - testutil.Ok(t, err) - - type record struct { - t recType - b []byte - } - cases := []struct { - t []record - exp [][]byte - fail bool - }{ - // Sequence of valid records. - { - t: []record{ - {recFull, data[0:200]}, - {recFirst, data[200:300]}, - {recLast, data[300:400]}, - {recFirst, data[400:800]}, - {recMiddle, data[800:900]}, - {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. - {recLast, data[900:900]}, - {recFirst, data[900:1000]}, - {recMiddle, data[1000:1200]}, - {recMiddle, data[1200:30000]}, - {recMiddle, data[30000:30001]}, - {recMiddle, data[30001:30001]}, - {recLast, data[30001:32000]}, - }, - exp: [][]byte{ - data[0:200], - data[200:400], - data[400:900], - data[900:32000], - }, - }, - // Exactly at the limit of one page minus the header size - { - t: []record{ - {recFull, data[0 : pageSize-recordHeaderSize]}, - }, - exp: [][]byte{ - data[:pageSize-recordHeaderSize], - }, - }, - // More than a full page, this exceeds our buffer and can never happen - // when written by the WAL. - { - t: []record{ - {recFull, data[0 : pageSize+1]}, - }, - fail: true, - }, - // Invalid orders of record types. - { - t: []record{{recMiddle, data[:200]}}, - fail: true, - }, - { - t: []record{{recLast, data[:200]}}, - fail: true, - }, - { - t: []record{ - {recFirst, data[:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - { - t: []record{ - {recFirst, data[:100]}, - {recMiddle, data[100:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - // Non-zero data after page termination. - { - t: []record{ - {recFull, data[:100]}, - {recPageTerm, append(make([]byte, 1000), 1)}, - }, - exp: [][]byte{data[:100]}, - fail: true, - }, - } - for i, c := range cases { + for i, c := range testReaderCases { t.Logf("test %d", i) var buf []byte @@ -155,6 +158,192 @@ func TestReader(t *testing.T) { } } +func TestReader_Live(t *testing.T) { + for i, c := range testReaderCases { + t.Logf("test %d", i) + dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i)) + t.Logf("created dir %s", dir) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // we're never going to have more than a single segment file per test case right now + f, err := os.Create(path.Join(dir, "00000000")) + testutil.Ok(t, err) + + // live reader doesn't work on readers created from bytes buffers, + // since we need to be able to write more data to the thing we're + // reading from after the reader has been created + wg := sync.WaitGroup{} + // make sure the reader doesn't start until at least one record is written + wg.Add(1) + go func() { + for i, rec := range c.t { + rec := encodedRecord(rec.t, rec.b) + n, err := f.Write(rec) + testutil.Ok(t, err) + testutil.Assert(t, n > 0, "no bytes were written to wal") + if i == 0 { + wg.Done() + } + } + }() + sr, err := OpenReadSegment(SegmentName(dir, 0)) + testutil.Ok(t, err) + lr := NewLiveReader(sr) + j := 0 + wg.Wait() + caseLoop: + for { + for ; lr.Next(); j++ { + rec := lr.Record() + t.Log("j: ", j) + testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") + if j == len(c.exp)-1 { + break caseLoop + } + + } + + // Because reads and writes are happening concurrently, unless we get an error we should + // attempt to read records again. + if j == 0 && lr.Err() == nil { + continue + } + + if !c.fail && lr.Err() != nil { + t.Fatalf("unexpected error: %s", lr.Err()) + } + if c.fail && lr.Err() == nil { + t.Fatalf("expected error but got none:\n\tinput: %+v", c.t) + } + if lr.Err() != nil { + t.Log("err: ", lr.Err()) + break + } + } + } +} + +func TestWAL_FuzzWriteRead_Live(t *testing.T) { + const count = 5000 + const segmentSize = int64(128 * 1024 * 1204) + var input [][]byte + lock := sync.RWMutex{} + var recs [][]byte + var index int + + // Get size of segment. + getSegmentSize := func(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err + } + + readSegment := func(r *LiveReader) { + for r.Next() { + rec := r.Record() + lock.RLock() + l := len(input) + lock.RUnlock() + if index >= l { + t.Fatalf("read too many records") + } + lock.RLock() + if !bytes.Equal(input[index], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + index, len(rec), len(input[index])) + } + lock.RUnlock() + index++ + } + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + } + + dir, err := ioutil.TempDir("", "wal_fuzz_live") + t.Log("created dir: ", dir) + testutil.Ok(t, err) + defer func() { + os.RemoveAll(dir) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + go func() { + for i := 0; i < count; i++ { + var sz int64 + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = pageSize * 8 + } + + rec := make([]byte, rand.Int63n(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + lock.Lock() + input = append(input, rec) + lock.Unlock() + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + }() + + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last > seg.i { + for { + readSegment(r) + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + size, err := getSegmentSize(dir, seg.i) + testutil.Ok(t, err) + // make sure we've read all of the current segment before rotating + if r.TotalRead() == size { + break + } + } + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + r = NewLiveReader(seg) + } + case <-readTicker.C: + readSegment(r) + } + if index == count { + break + } + } + testutil.Ok(t, r.Err()) +} func TestWAL_FuzzWriteRead(t *testing.T) { const count = 25000 @@ -336,14 +525,8 @@ func TestWAL_Repair(t *testing.T) { } testutil.NotOk(t, r.Err()) testutil.Ok(t, sr.Close()) + testutil.Ok(t, w.Repair(r.Err())) - - // See https://github.com/prometheus/prometheus/issues/4603 - // We need to close w.segment because it needs to be deleted. - // But this is to mainly artificially test Repair() again. - testutil.Ok(t, w.segment.Close()) - testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) - sr, err = NewSegmentsReader(dir) testutil.Ok(t, err) r = NewReader(sr) diff --git a/wal_test.go b/wal_test.go index 37f51ba61..fcda65b41 100644 --- a/wal_test.go +++ b/wal_test.go @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !windows + package tsdb import ( @@ -288,7 +290,7 @@ func TestWALRestoreCorrupted_invalidSegment(t *testing.T) { testutil.Ok(t, wal.Close()) - wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil) + _, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil) testutil.Ok(t, err) fns, err := fileutil.ReadDir(dir)