From c782d293109b228bb27fa89c214003a91aa06cef Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 23 Jul 2019 20:41:23 +0530 Subject: [PATCH] Vendor tsdb 0.10.0 Signed-off-by: Ganesh Vernekar --- go.mod | 2 +- go.sum | 4 +- .../github.com/prometheus/tsdb/CHANGELOG.md | 10 +- .../prometheus/tsdb/Makefile.common | 2 +- vendor/github.com/prometheus/tsdb/block.go | 7 +- .../prometheus/tsdb/chunkenc/chunk.go | 5 +- .../prometheus/tsdb/chunkenc/xor.go | 29 ++- .../prometheus/tsdb/chunks/chunks.go | 39 ++-- vendor/github.com/prometheus/tsdb/compact.go | 18 +- vendor/github.com/prometheus/tsdb/db.go | 200 ++++++++++++++++-- vendor/github.com/prometheus/tsdb/head.go | 72 ++++++- .../github.com/prometheus/tsdb/index/index.go | 74 +++++-- vendor/github.com/prometheus/tsdb/querier.go | 40 ++-- vendor/github.com/prometheus/tsdb/wal/wal.go | 68 +++--- vendor/modules.txt | 2 +- 15 files changed, 441 insertions(+), 131 deletions(-) diff --git a/go.mod b/go.mod index 48fe4d906..9de8162d7 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/prometheus/common v0.4.1 - github.com/prometheus/tsdb v0.9.1 + github.com/prometheus/tsdb v0.10.0 github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b diff --git a/go.sum b/go.sum index 20bf362cf..8f61eb6c9 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= -github.com/prometheus/tsdb v0.9.1 h1:IWaAmWkYlgG7/S4iw4IpAQt5Y35QaZM6/GsZ7GsjAuk= -github.com/prometheus/tsdb v0.9.1/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= +github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38ic= +github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 9d057a3bc..12364b09f 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,4 +1,11 @@ -## Master / unreleased +## master / unreleased + +## 0.10.0 + + - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. + - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s. + - `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`. + - [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse. ## 0.9.1 @@ -19,6 +26,7 @@ - [ENHANCEMENT] Reduced disk usage for WAL for small setups. - [ENHANCEMENT] Optimize queries using regexp for set lookups. + ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/vendor/github.com/prometheus/tsdb/Makefile.common b/vendor/github.com/prometheus/tsdb/Makefile.common index 48d2ff84e..db98993d6 100644 --- a/vendor/github.com/prometheus/tsdb/Makefile.common +++ b/vendor/github.com/prometheus/tsdb/Makefile.common @@ -74,7 +74,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_ GOLANGCI_LINT := GOLANGCI_LINT_OPTS ?= -GOLANGCI_LINT_VERSION ?= v1.16.0 +GOLANGCI_LINT_VERSION ?= v1.17.1 # golangci-lint only supports linux, darwin and windows platforms on i386/amd64. # windows isn't included here because of the path separator being different. ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin)) diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 6a8237f1f..d0fe2b2f7 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -138,11 +138,8 @@ type BlockReader interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) - // MinTime returns the min time of the block. - MinTime() int64 - - // MaxTime returns the max time of the block. - MaxTime() int64 + // Meta provides meta information about the block reader. + Meta() BlockMeta } // Appendable defines an entity to which data can be appended. diff --git a/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go b/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go index dc566606d..5f9349f05 100644 --- a/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go +++ b/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go @@ -44,7 +44,10 @@ type Chunk interface { Bytes() []byte Encoding() Encoding Appender() (Appender, error) - Iterator() Iterator + // The iterator passed as argument is for re-use. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(Iterator) Iterator NumSamples() int } diff --git a/vendor/github.com/prometheus/tsdb/chunkenc/xor.go b/vendor/github.com/prometheus/tsdb/chunkenc/xor.go index 1518772b3..ca20309f6 100644 --- a/vendor/github.com/prometheus/tsdb/chunkenc/xor.go +++ b/vendor/github.com/prometheus/tsdb/chunkenc/xor.go @@ -77,7 +77,7 @@ func (c *XORChunk) NumSamples() int { // Appender implements the Chunk interface. func (c *XORChunk) Appender() (Appender, error) { - it := c.iterator() + it := c.iterator(nil) // To get an appender we must know the state it would have if we had // appended all existing data from scratch. @@ -102,19 +102,25 @@ func (c *XORChunk) Appender() (Appender, error) { return a, nil } -func (c *XORChunk) iterator() *xorIterator { +func (c *XORChunk) iterator(it Iterator) *xorIterator { // Should iterators guarantee to act on a copy of the data so it doesn't lock append? // When using striped locks to guard access to chunks, probably yes. // Could only copy data if the chunk is not completed yet. + if xorIter, ok := it.(*xorIterator); ok { + xorIter.Reset(c.b.bytes()) + return xorIter + } return &xorIterator{ + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), } } // Iterator implements the Chunk interface. -func (c *XORChunk) Iterator() Iterator { - return c.iterator() +func (c *XORChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) } type xorAppender struct { @@ -243,6 +249,21 @@ func (it *xorIterator) Err() error { return it.err } +func (it *xorIterator) Reset(b []byte) { + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. + it.br = newBReader(b[2:]) + it.numTotal = binary.BigEndian.Uint16(b) + + it.numRead = 0 + it.t = 0 + it.val = 0 + it.leading = 0 + it.trailing = 0 + it.tDelta = 0 + it.err = nil +} + func (it *xorIterator) Next() bool { if it.err != nil || it.numRead == it.numTotal { return false diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 9ce8c57da..bd7b9e765 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -57,8 +57,9 @@ type Meta struct { } // writeHash writes the chunk encoding and raw data into the provided hash. -func (cm *Meta) writeHash(h hash.Hash) error { - if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { +func (cm *Meta) writeHash(h hash.Hash, buf []byte) error { + buf = append(buf[:0], byte(cm.Chunk.Encoding())) + if _, err := h.Write(buf[:1]); err != nil { return err } if _, err := h.Write(cm.Chunk.Bytes()); err != nil { @@ -97,6 +98,7 @@ type Writer struct { wbuf *bufio.Writer n int64 crc32 hash.Hash + buf [binary.MaxVarintLen32]byte segmentSize int64 } @@ -245,8 +247,8 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { if err != nil { return nil, err } - ait := a.Iterator() - bit := b.Iterator() + ait := a.Iterator(nil) + bit := b.Iterator(nil) aok, bok := ait.Next(), bit.Next() for aok && bok { at, av := ait.At() @@ -299,22 +301,19 @@ func (w *Writer) WriteChunks(chks ...Meta) error { } } - var ( - b = [binary.MaxVarintLen32]byte{} - seq = uint64(w.seq()) << 32 - ) + var seq = uint64(w.seq()) << 32 for i := range chks { chk := &chks[i] chk.Ref = seq | uint64(w.n) - n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) + n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes()))) - if err := w.write(b[:n]); err != nil { + if err := w.write(w.buf[:n]); err != nil { return err } - b[0] = byte(chk.Chunk.Encoding()) - if err := w.write(b[:1]); err != nil { + w.buf[0] = byte(chk.Chunk.Encoding()) + if err := w.write(w.buf[:1]); err != nil { return err } if err := w.write(chk.Chunk.Bytes()); err != nil { @@ -322,10 +321,10 @@ func (w *Writer) WriteChunks(chks ...Meta) error { } w.crc32.Reset() - if err := chk.writeHash(w.crc32); err != nil { + if err := chk.writeHash(w.crc32, w.buf[:]); err != nil { return err } - if err := w.write(w.crc32.Sum(b[:0])); err != nil { + if err := w.write(w.crc32.Sum(w.buf[:0])); err != nil { return err } } @@ -366,7 +365,7 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// Reader implements a SeriesReader for a serialized byte stream +// Reader implements a ChunkReader for a serialized byte stream // of series data. type Reader struct { bs []ByteSlice // The underlying bytes holding the encoded series data. @@ -503,11 +502,11 @@ func sequenceFiles(dir string) ([]string, error) { return res, nil } -func closeAll(cs []io.Closer) (err error) { +func closeAll(cs []io.Closer) error { + var merr tsdb_errors.MultiError + for _, c := range cs { - if e := c.Close(); e != nil { - err = e - } + merr.Add(c.Close()) } - return err + return merr.Err() } diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index e19b7ed76..9443c99e1 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -662,7 +662,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Set(1) - globalMaxt := blocks[0].MaxTime() + globalMaxt := blocks[0].Meta().MaxTime for i, b := range blocks { select { case <-c.ctx.Done(): @@ -671,13 +671,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } if !overlapping { - if i > 0 && b.MinTime() < globalMaxt { + if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) } - if b.MaxTime() > globalMaxt { - globalMaxt = b.MaxTime() + if b.Meta().MaxTime > globalMaxt { + globalMaxt = b.Meta().MaxTime } } @@ -736,6 +736,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "add symbols") } + delIter := &deletedIterator{} for set.Next() { select { case <-c.ctx.Done(): @@ -788,17 +789,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return err } - it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + delIter.it = chk.Chunk.Iterator(delIter.it) + delIter.intervals = dranges var ( t int64 v float64 ) - for it.Next() { - t, v = it.At() + for delIter.Next() { + t, v = delIter.At() app.Append(t, v) } - if err := it.Err(); err != nil { + if err := delIter.Err(); err != nil { return errors.Wrap(err, "iterate chunk while re-encoding") } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index e07f7d3e7..aa9ec1785 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -250,6 +250,178 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } +// ErrClosed is returned when the db is closed. +var ErrClosed = errors.New("db already closed") + +// DBReadOnly provides APIs for read only operations on a database. +// Current implementation doesn't support concurency so +// all API calls should happen in the same go routine. +type DBReadOnly struct { + logger log.Logger + dir string + closers []io.Closer + closed chan struct{} +} + +// OpenDBReadOnly opens DB in the given directory for read only operations. +func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { + if _, err := os.Stat(dir); err != nil { + return nil, errors.Wrap(err, "openning the db dir") + } + + if l == nil { + l = log.NewNopLogger() + } + + return &DBReadOnly{ + logger: l, + dir: dir, + closed: make(chan struct{}), + }, nil +} + +// Querier loads the wal and returns a new querier over the data partition for the given time range. +// Current implementation doesn't support multiple Queriers. +func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { + select { + case <-db.closed: + return nil, ErrClosed + default: + } + blocksReaders, err := db.Blocks() + if err != nil { + return nil, err + } + blocks := make([]*Block, len(blocksReaders)) + for i, b := range blocksReaders { + b, ok := b.(*Block) + if !ok { + return nil, errors.New("unable to convert a read only block to a normal block") + } + blocks[i] = b + } + + head, err := NewHead(nil, db.logger, nil, 1) + if err != nil { + return nil, err + } + maxBlockTime := int64(math.MinInt64) + if len(blocks) > 0 { + maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime + } + + // Also add the WAL if the current blocks don't cover the requestes time range. + if maxBlockTime <= maxt { + w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return nil, err + } + head, err = NewHead(nil, db.logger, w, 1) + if err != nil { + return nil, err + } + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + if err := head.Init(maxBlockTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } + // Set the wal to nil to disable all wal operations. + // This is mainly to avoid blocking when closing the head. + head.wal = nil + + db.closers = append(db.closers, head) + } + + // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. + // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. + // Option 2: refactor Querier to use another independent func which + // can than be used by a read only and writable db instances without any code duplication. + dbWritable := &DB{ + dir: db.dir, + logger: db.logger, + blocks: blocks, + head: head, + } + + return dbWritable.Querier(mint, maxt) +} + +// Blocks returns a slice of block readers for persisted blocks. +func (db *DBReadOnly) Blocks() ([]BlockReader, error) { + select { + case <-db.closed: + return nil, ErrClosed + default: + } + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) + if err != nil { + return nil, err + } + + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + } + } + if len(corrupted) > 0 { + for _, b := range loadable { + if err := b.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing a block", err) + } + } + return nil, errors.Errorf("unexpected corrupted block:%v", corrupted) + } + + if len(loadable) == 0 { + return nil, errors.New("no blocks found") + } + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime + }) + + blockMetas := make([]BlockMeta, 0, len(loadable)) + for _, b := range loadable { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + } + + // Close all previously open readers and add the new ones to the cache. + for _, closer := range db.closers { + closer.Close() + } + + blockClosers := make([]io.Closer, len(loadable)) + blockReaders := make([]BlockReader, len(loadable)) + for i, b := range loadable { + blockClosers[i] = b + blockReaders[i] = b + } + db.closers = blockClosers + + return blockReaders, nil +} + +// Close all block readers. +func (db *DBReadOnly) Close() error { + select { + case <-db.closed: + return ErrClosed + default: + } + close(db.closed) + + var merr tsdb_errors.MultiError + + for _, b := range db.closers { + merr.Add(b.Close()) + } + return merr.Err() +} + // Open returns a new DB in the given directory. func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { if err := os.MkdirAll(dir, 0777); err != nil { @@ -514,8 +686,10 @@ func (db *DB) compact() (err error) { return nil } -func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { - for _, b := range db.blocks { +// getBlock iterates a given block range to find a block by a given id. +// If found it returns the block itself and a boolean to indicate that it was found. +func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { + for _, b := range allBlocks { if b.Meta().ULID == id { return b, true } @@ -533,14 +707,14 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - loadable, corrupted, err := db.openBlocks() + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) if err != nil { return err } deletable := db.deletableBlocks(loadable) - // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. // This makes it resilient against the process crashing towards the end of a compaction. // Creation of a new block and deletion of its parents cannot happen atomically. // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. @@ -553,7 +727,7 @@ func (db *DB) reload() (err error) { if len(corrupted) > 0 { // Close all new blocks to release the lock for windows. for _, block := range loadable { - if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + if _, open := getBlock(db.blocks, block.Meta().ULID); !open { block.Close() } } @@ -621,24 +795,24 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { - dirs, err := blockDirs(db.dir) +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") } corrupted = make(map[ulid.ULID]error) - for _, dir := range dirs { - meta, _, err := readMetaFile(dir) + for _, bDir := range bDirs { + meta, _, err := readMetaFile(bDir) if err != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + level.Error(l).Log("msg", "not a block dir", "dir", bDir) continue } // See if we already have the block in memory or open it otherwise. - block, ok := db.getBlock(meta.ULID) - if !ok { - block, err = OpenBlock(db.logger, dir, db.chunkPool) + block, open := getBlock(loaded, meta.ULID) + if !open { + block, err = OpenBlock(l, bDir, chunkPool) if err != nil { corrupted[meta.ULID] = err continue diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 5e2eae858..0adb8847a 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" @@ -64,6 +65,7 @@ type Head struct { logger log.Logger appendPool sync.Pool bytesPool sync.Pool + numSeries uint64 minTime, maxTime int64 // Current min and max of the samples included in the head. minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -84,7 +86,7 @@ type Head struct { type headMetrics struct { activeAppenders prometheus.Gauge - series prometheus.Gauge + series prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -112,9 +114,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) - m.series = prometheus.NewGauge(prometheus.GaugeOpts{ + m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", + }, func() float64 { + return float64(h.NumSeries()) }) m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", @@ -502,6 +506,7 @@ func (h *Head) Init(minValidTime int64) error { return nil } + level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile") // Backfill the checkpoint first if it exists. dir, startFrom, err := LastCheckpoint(h.wal.Dir()) if err != nil && err != ErrNotFound { @@ -525,6 +530,7 @@ func (h *Head) Init(minValidTime int64) error { return errors.Wrap(err, "backfill checkpoint") } startFrom++ + level.Info(h.logger).Log("msg", "WAL checkpoint loaded") } // Find the last segment. @@ -548,6 +554,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return err } + level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last) } return nil @@ -698,6 +705,21 @@ func (h *rangeHead) MaxTime() int64 { return h.maxt } +func (h *rangeHead) NumSeries() uint64 { + return h.head.NumSeries() +} + +func (h *rangeHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: h.head.Meta().ULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + // initAppender is a helper to initialize the time bounds of the head // upon the first sample it receives. type initAppender struct { @@ -1022,9 +1044,11 @@ func (h *Head) gc() { seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) - h.metrics.series.Sub(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) + // Using AddUint64 to substract series removed. + // See: https://golang.org/pkg/sync/atomic/#AddUint64. + atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted) @@ -1101,6 +1125,26 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { return &headChunkReader{head: h, mint: mint, maxt: maxt} } +// NumSeries returns the number of active series in the head. +func (h *Head) NumSeries() uint64 { + return atomic.LoadUint64(&h.numSeries) +} + +// Meta returns meta information about the head. +// The head is dynamic so will return dynamic results. +func (h *Head) Meta() BlockMeta { + var id [16]byte + copy(id[:], "______head______") + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: ulid.ULID(id), + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + // MinTime returns the lowest time bound on visible data in the head. func (h *Head) MinTime() int64 { return atomic.LoadInt64(&h.minTime) @@ -1185,9 +1229,9 @@ type safeChunk struct { cid int } -func (c *safeChunk) Iterator() chunkenc.Iterator { +func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid) + it := c.s.iterator(c.cid, reuseIter) c.s.Unlock() return it } @@ -1347,8 +1391,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie return s, false } - h.metrics.series.Inc() h.metrics.seriesCreated.Inc() + atomic.AddUint64(&h.numSeries, 1) h.postings.Add(id, lset) @@ -1739,7 +1783,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int) chunkenc.Iterator { +func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator { c := s.chunk(id) // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. @@ -1749,17 +1793,23 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { } if id-s.firstChunkID < len(s.chunks)-1 { - return c.chunk.Iterator() + return c.chunk.Iterator(it) } // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. - it := &memSafeIterator{ - Iterator: c.chunk.Iterator(), + if msIter, ok := it.(*memSafeIterator); ok { + msIter.Iterator = c.chunk.Iterator(msIter.Iterator) + msIter.i = -1 + msIter.total = c.chunk.NumSamples() + msIter.buf = s.sampleBuf + return msIter + } + return &memSafeIterator{ + Iterator: c.chunk.Iterator(it), i: -1, total: c.chunk.NumSamples(), buf: s.sampleBuf, } - return it } func (s *memSeries) head() *memChunk { diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 6b333fa58..1a1e9bf30 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -123,10 +123,10 @@ type Writer struct { buf2 encoding.Encbuf uint32s []uint32 - symbols map[string]uint32 // symbol offsets - seriesOffsets map[uint64]uint64 // offsets of series - labelIndexes []hashEntry // label index offsets - postings []hashEntry // postings lists offsets + symbols map[string]uint32 // symbol offsets + seriesOffsets map[uint64]uint64 // offsets of series + labelIndexes []labelIndexHashEntry // label index offsets + postings []postingsHashEntry // postings lists offsets // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -271,11 +271,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { case idxStageDone: w.toc.LabelIndicesTable = w.pos - if err := w.writeOffsetTable(w.labelIndexes); err != nil { + if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } w.toc.PostingsTable = w.pos - if err := w.writeOffsetTable(w.postings); err != nil { + if err := w.writePostingsOffsetTable(); err != nil { return err } if err := w.writeTOC(); err != nil { @@ -420,7 +420,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { return err } - w.labelIndexes = append(w.labelIndexes, hashEntry{ + w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ keys: names, offset: w.pos, }) @@ -447,12 +447,12 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { return errors.Wrap(err, "write label index") } -// writeOffsetTable writes a sequence of readable hash entries. -func (w *Writer) writeOffsetTable(entries []hashEntry) error { +// writeLabelIndexesOffsetTable writes the label indices offset table. +func (w *Writer) writeLabelIndexesOffsetTable() error { w.buf2.Reset() - w.buf2.PutBE32int(len(entries)) + w.buf2.PutBE32int(len(w.labelIndexes)) - for _, e := range entries { + for _, e := range w.labelIndexes { w.buf2.PutUvarint(len(e.keys)) for _, k := range e.keys { w.buf2.PutUvarintStr(k) @@ -467,6 +467,25 @@ func (w *Writer) writeOffsetTable(entries []hashEntry) error { return w.write(w.buf1.Get(), w.buf2.Get()) } +// writePostingsOffsetTable writes the postings offset table. +func (w *Writer) writePostingsOffsetTable() error { + w.buf2.Reset() + w.buf2.PutBE32int(len(w.postings)) + + for _, e := range w.postings { + w.buf2.PutUvarint(2) + w.buf2.PutUvarintStr(e.name) + w.buf2.PutUvarintStr(e.value) + w.buf2.PutUvarint64(e.offset) + } + + w.buf1.Reset() + w.buf1.PutBE32int(w.buf2.Len()) + w.buf2.PutHash(w.crc32) + + return w.write(w.buf1.Get(), w.buf2.Get()) +} + const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { @@ -494,8 +513,9 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { return err } - w.postings = append(w.postings, hashEntry{ - keys: []string{name, value}, + w.postings = append(w.postings, postingsHashEntry{ + name: name, + value: value, offset: w.pos, }) @@ -542,11 +562,16 @@ func (s uint32slice) Len() int { return len(s) } func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } -type hashEntry struct { +type labelIndexHashEntry struct { keys []string offset uint64 } +type postingsHashEntry struct { + name, value string + offset uint64 +} + func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err @@ -781,9 +806,13 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e d := encoding.NewDecbufAt(bs, int(off), castagnoliTable) cnt := d.Be32() + // The Postings offset table takes only 2 keys per entry (name and value of label), + // and the LabelIndices offset table takes only 1 key per entry (a label name). + // Hence setting the size to max of both, i.e. 2. + keys := make([]string, 0, 2) for d.Err() == nil && d.Len() > 0 && cnt > 0 { keyCount := d.Uvarint() - keys := make([]string, 0, keyCount) + keys = keys[:0] for i := 0; i < keyCount; i++ { keys = append(keys, d.UvarintStr()) @@ -951,25 +980,30 @@ func (r *Reader) LabelNames() ([]string, error) { type stringTuples struct { length int // tuple length entries []string // flattened tuple entries + swapBuf []string } func NewStringTuples(entries []string, length int) (*stringTuples, error) { if len(entries)%length != 0 { return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list") } - return &stringTuples{entries: entries, length: length}, nil + return &stringTuples{ + entries: entries, + length: length, + }, nil } func (t *stringTuples) Len() int { return len(t.entries) / t.length } func (t *stringTuples) At(i int) ([]string, error) { return t.entries[i : i+t.length], nil } func (t *stringTuples) Swap(i, j int) { - c := make([]string, t.length) - copy(c, t.entries[i:i+t.length]) - + if t.swapBuf == nil { + t.swapBuf = make([]string, t.length) + } + copy(t.swapBuf, t.entries[i:i+t.length]) for k := 0; k < t.length; k++ { t.entries[i+k] = t.entries[j+k] - t.entries[j+k] = c[k] + t.entries[j+k] = t.swapBuf[k] } } diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 253102b0e..fbd9493f4 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -1060,8 +1060,9 @@ func (it *verticalMergeSeriesIterator) Err() error { type chunkSeriesIterator struct { chunks []chunks.Meta - i int - cur chunkenc.Iterator + i int + cur chunkenc.Iterator + bufDelIter *deletedIterator maxt, mint int64 @@ -1069,21 +1070,32 @@ type chunkSeriesIterator struct { } func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { - it := cs[0].Chunk.Iterator() - - if len(dranges) > 0 { - it = &deletedIterator{it: it, intervals: dranges} - } - return &chunkSeriesIterator{ + csi := &chunkSeriesIterator{ chunks: cs, i: 0, - cur: it, mint: mint, maxt: maxt, intervals: dranges, } + csi.resetCurIterator() + + return csi +} + +func (it *chunkSeriesIterator) resetCurIterator() { + if len(it.intervals) == 0 { + it.cur = it.chunks[it.i].Chunk.Iterator(it.cur) + return + } + if it.bufDelIter == nil { + it.bufDelIter = &deletedIterator{ + intervals: it.intervals, + } + } + it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it) + it.cur = it.bufDelIter } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { @@ -1102,10 +1114,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { } } - it.cur = it.chunks[it.i].Chunk.Iterator() - if len(it.intervals) > 0 { - it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} - } + it.resetCurIterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -1145,10 +1154,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Chunk.Iterator() - if len(it.intervals) > 0 { - it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} - } + it.resetCurIterator() return it.Next() } diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index 39daba975..878aae6ba 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -203,6 +203,48 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi stopc: make(chan chan struct{}), compress: compress, } + registerMetrics(reg, w) + + _, j, err := w.Segments() + // Index of the Segment we want to open and write to. + writeSegmentIndex := 0 + if err != nil { + return nil, errors.Wrap(err, "get segment range") + } + // If some segments already exist create one with a higher index than the last segment. + if j != -1 { + writeSegmentIndex = j + 1 + } + + segment, err := CreateSegment(w.dir, writeSegmentIndex) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } + + go w.run() + + return w, nil +} + +// Open an existing WAL. +func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + } + + registerMetrics(reg, w) + return w, nil +} + +func registerMetrics(reg prometheus.Registerer, w *WAL) { w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", @@ -231,30 +273,6 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi if reg != nil { reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } - - _, j, err := w.Segments() - // Index of the Segment we want to open and write to. - writeSegmentIndex := 0 - if err != nil { - return nil, errors.Wrap(err, "get segment range") - } - // If some segments already exist create one with a higher index than the last segment. - if j != -1 { - writeSegmentIndex = j + 1 - } - - segment, err := CreateSegment(w.dir, writeSegmentIndex) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } - - go w.run() - - return w, nil } // CompressionEnabled returns if compression is enabled on this WAL. @@ -302,7 +320,6 @@ func (w *WAL) Repair(origErr error) error { if cerr.Segment < 0 { return errors.New("corruption error does not specify position") } - level.Warn(w.logger).Log("msg", "starting corruption repair", "segment", cerr.Segment, "offset", cerr.Offset) @@ -487,7 +504,6 @@ func (w *WAL) flushPage(clear bool) error { // First Byte of header format: // [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] - const ( snappyMask = 1 << 3 recTypeMask = snappyMask - 1 diff --git a/vendor/modules.txt b/vendor/modules.txt index 37089dabb..6ddd815ae 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -277,7 +277,7 @@ github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg # github.com/prometheus/procfs v0.0.2 github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs -# github.com/prometheus/tsdb v0.9.1 +# github.com/prometheus/tsdb v0.10.0 github.com/prometheus/tsdb github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/labels