mirror of https://github.com/prometheus/prometheus
vendor: update prometheus/tsdb
parent
84eca7dfb2
commit
10b2e8c637
|
@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
|
||||||
return renameFile(tmp, path)
|
return renameFile(tmp, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block represents a directory of time series data covering a continous time range.
|
// Block represents a directory of time series data covering a continuous time range.
|
||||||
type Block struct {
|
type Block struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
closing bool
|
closing bool
|
||||||
|
@ -142,10 +142,9 @@ type Block struct {
|
||||||
dir string
|
dir string
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
|
|
||||||
chunkr *chunkReader
|
chunkr ChunkReader
|
||||||
indexr *indexReader
|
indexr IndexReader
|
||||||
|
tombstones TombstoneReader
|
||||||
tombstones tombstoneReader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
||||||
|
@ -156,11 +155,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cr, err := newChunkReader(chunkDir(dir), pool)
|
cr, err := NewDirChunkReader(chunkDir(dir), pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ir, err := newIndexReader(dir)
|
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -284,13 +283,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
return ErrClosing
|
return ErrClosing
|
||||||
}
|
}
|
||||||
|
|
||||||
pr := newPostingsReader(pb.indexr)
|
p, absent, err := PostingsForMatchers(pb.indexr, ms...)
|
||||||
p, absent := pr.Select(ms...)
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "select series")
|
||||||
|
}
|
||||||
|
|
||||||
ir := pb.indexr
|
ir := pb.indexr
|
||||||
|
|
||||||
// Choose only valid postings which have chunks in the time-range.
|
// Choose only valid postings which have chunks in the time-range.
|
||||||
stones := map[uint64]Intervals{}
|
stones := memTombstones{}
|
||||||
|
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
var chks []ChunkMeta
|
var chks []ChunkMeta
|
||||||
|
@ -322,16 +323,21 @@ Outer:
|
||||||
return p.Err()
|
return p.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge the current and new tombstones.
|
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||||
for k, v := range stones {
|
for _, iv := range ivs {
|
||||||
pb.tombstones.add(k, v[0])
|
stones.add(id, iv)
|
||||||
|
pb.meta.Stats.NumTombstones++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
pb.tombstones = stones
|
||||||
|
|
||||||
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
|
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
|
|
||||||
return writeMetaFile(pb.dir, &pb.meta)
|
return writeMetaFile(pb.dir, &pb.meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
--- contention:
|
||||||
|
cycles/second=2494255279
|
||||||
|
80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
|
||||||
|
80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1
|
||||||
|
37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1
|
||||||
|
21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
|
||||||
|
1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1
|
||||||
|
851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1
|
||||||
|
818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
|
||||||
|
501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1
|
||||||
|
7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1
|
||||||
|
3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
|
|
@ -298,7 +298,7 @@ type ChunkReader interface {
|
||||||
// of series data.
|
// of series data.
|
||||||
type chunkReader struct {
|
type chunkReader struct {
|
||||||
// The underlying bytes holding the encoded series data.
|
// The underlying bytes holding the encoded series data.
|
||||||
bs [][]byte
|
bs []ByteSlice
|
||||||
|
|
||||||
// Closers for resources behind the byte slices.
|
// Closers for resources behind the byte slices.
|
||||||
cs []io.Closer
|
cs []io.Closer
|
||||||
|
@ -306,8 +306,32 @@ type chunkReader struct {
|
||||||
pool chunks.Pool
|
pool chunks.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) {
|
||||||
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
|
cr := chunkReader{pool: pool, bs: bs, cs: cs}
|
||||||
|
|
||||||
|
for i, b := range cr.bs {
|
||||||
|
if b.Len() < 4 {
|
||||||
|
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
||||||
|
}
|
||||||
|
// Verify magic number.
|
||||||
|
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
|
||||||
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &cr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunkReader returns a new chunk reader against the given byte slices.
|
||||||
|
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) {
|
||||||
|
if pool == nil {
|
||||||
|
pool = chunks.NewPool()
|
||||||
|
}
|
||||||
|
return newChunkReader(bs, nil, pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the
|
||||||
|
// given directory.
|
||||||
|
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) {
|
||||||
files, err := sequenceFiles(dir)
|
files, err := sequenceFiles(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
|
||||||
if pool == nil {
|
if pool == nil {
|
||||||
pool = chunks.NewPool()
|
pool = chunks.NewPool()
|
||||||
}
|
}
|
||||||
cr := chunkReader{pool: pool}
|
|
||||||
|
var bs []ByteSlice
|
||||||
|
var cs []io.Closer
|
||||||
|
|
||||||
for _, fn := range files {
|
for _, fn := range files {
|
||||||
f, err := openMmapFile(fn)
|
f, err := openMmapFile(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "mmap files")
|
return nil, errors.Wrapf(err, "mmap files")
|
||||||
}
|
}
|
||||||
cr.cs = append(cr.cs, f)
|
cs = append(cs, f)
|
||||||
cr.bs = append(cr.bs, f.b)
|
bs = append(bs, realByteSlice(f.b))
|
||||||
}
|
}
|
||||||
|
return newChunkReader(bs, cs, pool)
|
||||||
for i, b := range cr.bs {
|
|
||||||
if len(b) < 4 {
|
|
||||||
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
|
||||||
}
|
|
||||||
// Verify magic number.
|
|
||||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
|
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &cr, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chunkReader) Close() error {
|
func (s *chunkReader) Close() error {
|
||||||
|
@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
}
|
}
|
||||||
b := s.bs[seq]
|
b := s.bs[seq]
|
||||||
|
|
||||||
if int(off) >= len(b) {
|
if int(off) >= b.Len() {
|
||||||
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
|
||||||
}
|
}
|
||||||
b = b[off:]
|
// With the minimum chunk length this should never cause us reading
|
||||||
|
// over the end of the slice.
|
||||||
|
r := b.Range(off, off+binary.MaxVarintLen32)
|
||||||
|
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(r)
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
return nil, fmt.Errorf("reading chunk length failed")
|
return nil, fmt.Errorf("reading chunk length failed")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
r = b.Range(off+n, off+n+int(l))
|
||||||
|
|
||||||
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
|
return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l])
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,7 @@ package chunks
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"math"
|
"math"
|
||||||
|
"math/bits"
|
||||||
bits "github.com/dgryski/go-bits"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// XORChunk holds XOR encoded sample data.
|
// XORChunk holds XOR encoded sample data.
|
||||||
|
@ -197,8 +196,8 @@ func (a *xorAppender) writeVDelta(v float64) {
|
||||||
}
|
}
|
||||||
a.b.writeBit(one)
|
a.b.writeBit(one)
|
||||||
|
|
||||||
leading := uint8(bits.Clz(vDelta))
|
leading := uint8(bits.LeadingZeros64(vDelta))
|
||||||
trailing := uint8(bits.Ctz(vDelta))
|
trailing := uint8(bits.TrailingZeros64(vDelta))
|
||||||
|
|
||||||
// Clamp number of leading zeros to avoid overflow when encoding.
|
// Clamp number of leading zeros to avoid overflow when encoding.
|
||||||
if leading >= 32 {
|
if leading >= 32 {
|
||||||
|
|
|
@ -52,7 +52,7 @@ type Compactor interface {
|
||||||
Plan(dir string) ([]string, error)
|
Plan(dir string) ([]string, error)
|
||||||
|
|
||||||
// Write persists a Block into a directory.
|
// Write persists a Block into a directory.
|
||||||
Write(dest string, b BlockReader, mint, maxt int64) error
|
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
|
||||||
|
|
||||||
// Compact runs compaction against the provided directories. Must
|
// Compact runs compaction against the provided directories. Must
|
||||||
// only be called concurrently with results of Plan().
|
// only be called concurrently with results of Plan().
|
||||||
|
@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
|
||||||
return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
|
return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error {
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
|
@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e
|
||||||
meta.Compaction.Level = 1
|
meta.Compaction.Level = 1
|
||||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||||
|
|
||||||
return c.write(dest, meta, b)
|
return uid, c.write(dest, meta, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// instrumentedChunkWriter is used for level 1 compactions to record statistics
|
// instrumentedChunkWriter is used for level 1 compactions to record statistics
|
||||||
|
@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an empty tombstones file.
|
// Create an empty tombstones file.
|
||||||
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
|
||||||
return errors.Wrap(err, "write new tombstones file")
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
|
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
|
||||||
var (
|
var (
|
||||||
set compactionSet
|
set ChunkSeriesSet
|
||||||
allSymbols = make(map[string]struct{}, 1<<16)
|
allSymbols = make(map[string]struct{}, 1<<16)
|
||||||
closers = []io.Closer{}
|
closers = []io.Closer{}
|
||||||
)
|
)
|
||||||
|
@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionSet interface {
|
|
||||||
Next() bool
|
|
||||||
At() (labels.Labels, []ChunkMeta, Intervals)
|
|
||||||
Err() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type compactionSeriesSet struct {
|
type compactionSeriesSet struct {
|
||||||
p Postings
|
p Postings
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
tombstones TombstoneReader
|
tombstones TombstoneReader
|
||||||
series SeriesSet
|
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []ChunkMeta
|
c []ChunkMeta
|
||||||
|
@ -631,7 +624,11 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
c.intervals = c.tombstones.Get(c.p.At())
|
c.intervals, err = c.tombstones.Get(c.p.At())
|
||||||
|
if err != nil {
|
||||||
|
c.err = errors.Wrap(err, "get tombstones")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
|
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
|
||||||
c.err = errors.Wrapf(err, "get series %d", c.p.At())
|
c.err = errors.Wrapf(err, "get series %d", c.p.At())
|
||||||
|
@ -675,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionMerger struct {
|
type compactionMerger struct {
|
||||||
a, b compactionSet
|
a, b ChunkSeriesSet
|
||||||
|
|
||||||
aok, bok bool
|
aok, bok bool
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
|
@ -688,7 +685,7 @@ type compactionSeries struct {
|
||||||
chunks []*ChunkMeta
|
chunks []*ChunkMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) {
|
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
|
||||||
c := &compactionMerger{
|
c := &compactionMerger{
|
||||||
a: a,
|
a: a,
|
||||||
b: b,
|
b: b,
|
||||||
|
|
|
@ -52,7 +52,7 @@ var DefaultOptions = &Options{
|
||||||
|
|
||||||
// Options of the DB storage.
|
// Options of the DB storage.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// The interval at which the write ahead log is flushed to disc.
|
// The interval at which the write ahead log is flushed to disk.
|
||||||
WALFlushInterval time.Duration
|
WALFlushInterval time.Duration
|
||||||
|
|
||||||
// Duration of persisted data to keep.
|
// Duration of persisted data to keep.
|
||||||
|
@ -101,7 +101,6 @@ type DB struct {
|
||||||
opts *Options
|
opts *Options
|
||||||
chunkPool chunks.Pool
|
chunkPool chunks.Pool
|
||||||
compactor Compactor
|
compactor Compactor
|
||||||
wal WAL
|
|
||||||
|
|
||||||
// Mutex for that must be held when modifying the general block layout.
|
// Mutex for that must be held when modifying the general block layout.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
@ -142,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
||||||
})
|
})
|
||||||
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "prometheus_tsdb_reloads_failures_total",
|
Name: "prometheus_tsdb_reloads_failures_total",
|
||||||
Help: "Number of times the database failed to reload black data from disk.",
|
Help: "Number of times the database failed to reload block data from disk.",
|
||||||
})
|
})
|
||||||
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
|
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "prometheus_tsdb_compactions_triggered_total",
|
Name: "prometheus_tsdb_compactions_triggered_total",
|
||||||
|
@ -278,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
defer db.mtx.RUnlock()
|
blocks := db.blocks[:]
|
||||||
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
if len(db.blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
last := db.blocks[len(db.blocks)-1]
|
last := blocks[len(db.blocks)-1]
|
||||||
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
|
||||||
|
|
||||||
return retentionCutoff(db.dir, mint)
|
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
||||||
|
dirs, err := retentionCutoffDirs(db.dir, mint)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will close the dirs and then delete the dirs.
|
||||||
|
return len(dirs) > 0, db.reload(dirs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender opens a new appender against the database.
|
// Appender opens a new appender against the database.
|
||||||
|
@ -345,7 +351,7 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
}
|
}
|
||||||
if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
|
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
|
||||||
return changes, errors.Wrap(err, "persist head block")
|
return changes, errors.Wrap(err, "persist head block")
|
||||||
}
|
}
|
||||||
changes = true
|
changes = true
|
||||||
|
@ -389,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
return changes, nil
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// retentionCutoff deletes all directories of blocks in dir that are strictly
|
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
|
||||||
// before mint.
|
// before mint.
|
||||||
func retentionCutoff(dir string, mint int64) (bool, error) {
|
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
|
||||||
df, err := fileutil.OpenDir(dir)
|
df, err := fileutil.OpenDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "open directory")
|
return nil, errors.Wrapf(err, "open directory")
|
||||||
}
|
}
|
||||||
defer df.Close()
|
defer df.Close()
|
||||||
|
|
||||||
dirs, err := blockDirs(dir)
|
dirs, err := blockDirs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "list block dirs %s", dir)
|
return nil, errors.Wrapf(err, "list block dirs %s", dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
changes := false
|
delDirs := []string{}
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return changes, errors.Wrapf(err, "read block meta %s", dir)
|
return nil, errors.Wrapf(err, "read block meta %s", dir)
|
||||||
}
|
}
|
||||||
// The first block we encounter marks that we crossed the boundary
|
// The first block we encounter marks that we crossed the boundary
|
||||||
// of deletable blocks.
|
// of deletable blocks.
|
||||||
if meta.MaxTime >= mint {
|
if meta.MaxTime >= mint {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
changes = true
|
|
||||||
|
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
delDirs = append(delDirs, dir)
|
||||||
return changes, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return changes, fileutil.Fsync(df)
|
return delDirs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
|
@ -572,6 +575,7 @@ func (db *DB) Close() error {
|
||||||
if db.lockf != nil {
|
if db.lockf != nil {
|
||||||
merr.Add(db.lockf.Unlock())
|
merr.Add(db.lockf.Unlock())
|
||||||
}
|
}
|
||||||
|
merr.Add(db.head.Close())
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -615,7 +619,8 @@ func (db *DB) Snapshot(dir string) error {
|
||||||
return errors.Wrap(err, "error snapshotting headblock")
|
return errors.Wrap(err, "error snapshotting headblock")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
||||||
|
return errors.Wrap(err, "snapshot head block")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier returns a new querier over the data partition for the given time range.
|
// Querier returns a new querier over the data partition for the given time range.
|
||||||
|
|
|
@ -11,19 +11,20 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// +build !windows,!plan9,!solaris
|
// +build !windows,!plan9
|
||||||
|
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mmap(f *os.File, length int) ([]byte, error) {
|
func mmap(f *os.File, length int) ([]byte, error) {
|
||||||
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
|
return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED)
|
||||||
}
|
}
|
||||||
|
|
||||||
func munmap(b []byte) (err error) {
|
func munmap(b []byte) (err error) {
|
||||||
return syscall.Munmap(b)
|
return unix.Munmap(b)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"hash"
|
"hash"
|
||||||
|
"hash/crc32"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||||
|
|
||||||
|
// crc32 returns a CRC32 checksum over the remaining bytes.
|
||||||
|
func (d *decbuf) crc32() uint32 {
|
||||||
|
return crc32.Checksum(d.b, castagnoliTable)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *decbuf) uvarintStr() string {
|
func (d *decbuf) uvarintStr() string {
|
||||||
l := d.uvarint64()
|
l := d.uvarint64()
|
||||||
if d.e != nil {
|
if d.e != nil {
|
||||||
|
|
|
@ -66,7 +66,7 @@ type Head struct {
|
||||||
|
|
||||||
postings *memPostings // postings lists for terms
|
postings *memPostings // postings lists for terms
|
||||||
|
|
||||||
tombstones tombstoneReader
|
tombstones memTombstones
|
||||||
}
|
}
|
||||||
|
|
||||||
type headMetrics struct {
|
type headMetrics struct {
|
||||||
|
@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
symbols: map[string]struct{}{},
|
symbols: map[string]struct{}{},
|
||||||
postings: newUnorderedMemPostings(),
|
postings: newUnorderedMemPostings(),
|
||||||
tombstones: newEmptyTombstoneReader(),
|
tombstones: memTombstones{},
|
||||||
}
|
}
|
||||||
h.metrics = newHeadMetrics(h, r)
|
h.metrics = newHeadMetrics(h, r)
|
||||||
|
|
||||||
|
@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
|
|
||||||
ir := h.indexRange(mint, maxt)
|
ir := h.indexRange(mint, maxt)
|
||||||
|
|
||||||
pr := newPostingsReader(ir)
|
p, absent, err := PostingsForMatchers(ir, ms...)
|
||||||
p, absent := pr.Select(ms...)
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "select series")
|
||||||
|
}
|
||||||
|
|
||||||
var stones []Stone
|
var stones []Stone
|
||||||
|
|
||||||
|
@ -739,6 +741,11 @@ func (h *Head) MaxTime() int64 {
|
||||||
return atomic.LoadInt64(&h.maxTime)
|
return atomic.LoadInt64(&h.maxTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close flushes the WAL and closes the head.
|
||||||
|
func (h *Head) Close() error {
|
||||||
|
return h.wal.Close()
|
||||||
|
}
|
||||||
|
|
||||||
type headChunkReader struct {
|
type headChunkReader struct {
|
||||||
head *Head
|
head *Head
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
|
@ -560,7 +560,7 @@ type StringTuples interface {
|
||||||
|
|
||||||
type indexReader struct {
|
type indexReader struct {
|
||||||
// The underlying byte slice holding the encoded series data.
|
// The underlying byte slice holding the encoded series data.
|
||||||
b []byte
|
b ByteSlice
|
||||||
toc indexTOC
|
toc indexTOC
|
||||||
|
|
||||||
// Close that releases the underlying resources of the byte slice.
|
// Close that releases the underlying resources of the byte slice.
|
||||||
|
@ -575,33 +575,62 @@ type indexReader struct {
|
||||||
// prevents memory faults when applications work with read symbols after
|
// prevents memory faults when applications work with read symbols after
|
||||||
// the block has been unmapped.
|
// the block has been unmapped.
|
||||||
symbols map[uint32]string
|
symbols map[uint32]string
|
||||||
|
|
||||||
|
crc32 hash.Hash32
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInvalidSize = fmt.Errorf("invalid size")
|
errInvalidSize = fmt.Errorf("invalid size")
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
|
errInvalidChecksum = fmt.Errorf("invalid checksum")
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewIndexReader returns a new IndexReader on the given directory.
|
// ByteSlice abstracts a byte slice.
|
||||||
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) }
|
type ByteSlice interface {
|
||||||
|
Len() int
|
||||||
|
Range(start, end int) []byte
|
||||||
|
}
|
||||||
|
|
||||||
// newIndexReader returns a new indexReader on the given directory.
|
type realByteSlice []byte
|
||||||
func newIndexReader(dir string) (*indexReader, error) {
|
|
||||||
f, err := openMmapFile(filepath.Join(dir, "index"))
|
func (b realByteSlice) Len() int {
|
||||||
|
return len(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b realByteSlice) Range(start, end int) []byte {
|
||||||
|
return b[start:end]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b realByteSlice) Sub(start, end int) ByteSlice {
|
||||||
|
return b[start:end]
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIndexReader returns a new IndexReader on the given byte slice.
|
||||||
|
func NewIndexReader(b ByteSlice) (IndexReader, error) {
|
||||||
|
return newIndexReader(b, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFileIndexReader returns a new index reader against the given index file.
|
||||||
|
func NewFileIndexReader(path string) (IndexReader, error) {
|
||||||
|
f, err := openMmapFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r := &indexReader{
|
return newIndexReader(realByteSlice(f.b), f)
|
||||||
b: f.b,
|
}
|
||||||
c: f,
|
|
||||||
symbols: map[uint32]string{},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
|
||||||
|
r := &indexReader{
|
||||||
|
b: b,
|
||||||
|
c: c,
|
||||||
|
symbols: map[uint32]string{},
|
||||||
|
crc32: newCRC32(),
|
||||||
|
}
|
||||||
// Verify magic number.
|
// Verify magic number.
|
||||||
if len(f.b) < 4 {
|
if b.Len() < 4 {
|
||||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||||
}
|
}
|
||||||
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
|
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
|
||||||
return nil, errors.Errorf("invalid magic number %x", m)
|
return nil, errors.Errorf("invalid magic number %x", m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -611,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
|
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
|
||||||
return nil, errors.Wrap(err, "read symbols")
|
return nil, errors.Wrap(err, "read symbols")
|
||||||
}
|
}
|
||||||
|
var err error
|
||||||
|
|
||||||
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -621,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) readTOC() error {
|
func (r *indexReader) readTOC() error {
|
||||||
d := r.decbufAt(len(r.b) - indexTOCLen)
|
if r.b.Len() < indexTOCLen {
|
||||||
|
return errInvalidSize
|
||||||
|
}
|
||||||
|
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
|
||||||
|
|
||||||
|
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
|
||||||
|
d := decbuf{b: b[:len(b)-4]}
|
||||||
|
|
||||||
|
if d.crc32() != expCRC {
|
||||||
|
return errInvalidChecksum
|
||||||
|
}
|
||||||
|
|
||||||
r.toc.symbols = d.be64()
|
r.toc.symbols = d.be64()
|
||||||
r.toc.series = d.be64()
|
r.toc.series = d.be64()
|
||||||
|
@ -630,16 +670,61 @@ func (r *indexReader) readTOC() error {
|
||||||
r.toc.postings = d.be64()
|
r.toc.postings = d.be64()
|
||||||
r.toc.postingsTable = d.be64()
|
r.toc.postingsTable = d.be64()
|
||||||
|
|
||||||
// TODO(fabxc): validate checksum.
|
return d.err()
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
|
||||||
|
// after offset to hold the big endian encoded content length, followed by the contents and the expected
|
||||||
|
// checksum.
|
||||||
func (r *indexReader) decbufAt(off int) decbuf {
|
func (r *indexReader) decbufAt(off int) decbuf {
|
||||||
if len(r.b) < off {
|
if r.b.Len() < off+4 {
|
||||||
return decbuf{e: errInvalidSize}
|
return decbuf{e: errInvalidSize}
|
||||||
}
|
}
|
||||||
return decbuf{b: r.b[off:]}
|
b := r.b.Range(off, off+4)
|
||||||
|
l := int(binary.BigEndian.Uint32(b))
|
||||||
|
|
||||||
|
if r.b.Len() < off+4+l+4 {
|
||||||
|
return decbuf{e: errInvalidSize}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load bytes holding the contents plus a CRC32 checksum.
|
||||||
|
b = r.b.Range(off+4, off+4+l+4)
|
||||||
|
dec := decbuf{b: b[:len(b)-4]}
|
||||||
|
|
||||||
|
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
|
||||||
|
return decbuf{e: errInvalidChecksum}
|
||||||
|
}
|
||||||
|
return dec
|
||||||
|
}
|
||||||
|
|
||||||
|
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
|
||||||
|
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
|
||||||
|
// checksum.
|
||||||
|
func (r *indexReader) decbufUvarintAt(off int) decbuf {
|
||||||
|
// We never have to access this method at the far end of the byte slice. Thus just checking
|
||||||
|
// against the MaxVarintLen32 is sufficient.
|
||||||
|
if r.b.Len() < off+binary.MaxVarintLen32 {
|
||||||
|
return decbuf{e: errInvalidSize}
|
||||||
|
}
|
||||||
|
b := r.b.Range(off, off+binary.MaxVarintLen32)
|
||||||
|
|
||||||
|
l, n := binary.Uvarint(b)
|
||||||
|
if n > binary.MaxVarintLen32 {
|
||||||
|
return decbuf{e: errors.New("invalid uvarint")}
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.b.Len() < off+n+int(l)+4 {
|
||||||
|
return decbuf{e: errInvalidSize}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load bytes holding the contents plus a CRC32 checksum.
|
||||||
|
b = r.b.Range(off+n, off+n+int(l)+4)
|
||||||
|
dec := decbuf{b: b[:len(b)-4]}
|
||||||
|
|
||||||
|
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
|
||||||
|
return decbuf{e: errInvalidChecksum}
|
||||||
|
}
|
||||||
|
return dec
|
||||||
}
|
}
|
||||||
|
|
||||||
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
||||||
|
@ -649,22 +734,22 @@ func (r *indexReader) readSymbols(off int) error {
|
||||||
if off == 0 {
|
if off == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
d := r.decbufAt(off)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
d1 = r.decbufAt(int(off))
|
origLen = d.len()
|
||||||
d2 = d1.decbuf(d1.be32int())
|
cnt = d.be32int()
|
||||||
origLen = d2.len()
|
|
||||||
cnt = d2.be32int()
|
|
||||||
basePos = uint32(off) + 4
|
basePos = uint32(off) + 4
|
||||||
nextPos = basePos + uint32(origLen-d2.len())
|
nextPos = basePos + uint32(origLen-d.len())
|
||||||
)
|
)
|
||||||
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
for d.err() == nil && d.len() > 0 && cnt > 0 {
|
||||||
s := d2.uvarintStr()
|
s := d.uvarintStr()
|
||||||
r.symbols[uint32(nextPos)] = s
|
r.symbols[uint32(nextPos)] = s
|
||||||
|
|
||||||
nextPos = basePos + uint32(origLen-d2.len())
|
nextPos = basePos + uint32(origLen-d.len())
|
||||||
cnt--
|
cnt--
|
||||||
}
|
}
|
||||||
return d2.err()
|
return d.err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// readOffsetTable reads an offset table at the given position and returns a map
|
// readOffsetTable reads an offset table at the given position and returns a map
|
||||||
|
@ -672,53 +757,29 @@ func (r *indexReader) readSymbols(off int) error {
|
||||||
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||||
const sep = "\xff"
|
const sep = "\xff"
|
||||||
|
|
||||||
var (
|
d := r.decbufAt(int(off))
|
||||||
d1 = r.decbufAt(int(off))
|
cnt := d.be32()
|
||||||
d2 = d1.decbuf(d1.be32int())
|
|
||||||
cnt = d2.be32()
|
|
||||||
)
|
|
||||||
|
|
||||||
res := make(map[string]uint32, 512)
|
res := make(map[string]uint32, cnt)
|
||||||
|
|
||||||
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
for d.err() == nil && d.len() > 0 && cnt > 0 {
|
||||||
keyCount := int(d2.uvarint())
|
keyCount := int(d.uvarint())
|
||||||
keys := make([]string, 0, keyCount)
|
keys := make([]string, 0, keyCount)
|
||||||
|
|
||||||
for i := 0; i < keyCount; i++ {
|
for i := 0; i < keyCount; i++ {
|
||||||
keys = append(keys, d2.uvarintStr())
|
keys = append(keys, d.uvarintStr())
|
||||||
}
|
}
|
||||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
res[strings.Join(keys, sep)] = uint32(d.uvarint())
|
||||||
|
|
||||||
cnt--
|
cnt--
|
||||||
}
|
}
|
||||||
|
return res, d.err()
|
||||||
// TODO(fabxc): verify checksum from remainer of d1.
|
|
||||||
return res, d2.err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Close() error {
|
func (r *indexReader) Close() error {
|
||||||
return r.c.Close()
|
return r.c.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|
||||||
b := r.b[o:]
|
|
||||||
|
|
||||||
if len(b) < 5 {
|
|
||||||
return 0, nil, errors.Wrap(errInvalidSize, "read header")
|
|
||||||
}
|
|
||||||
|
|
||||||
flag := b[0]
|
|
||||||
l := binary.BigEndian.Uint32(b[1:5])
|
|
||||||
|
|
||||||
b = b[5:]
|
|
||||||
|
|
||||||
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
|
||||||
if len(b) < int(l)+4 {
|
|
||||||
return 0, nil, errors.Wrap(errInvalidSize, "section content")
|
|
||||||
}
|
|
||||||
return flag, b[:l], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||||
s, ok := r.symbols[o]
|
s, ok := r.symbols[o]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -748,21 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
//return nil, fmt.Errorf("label index doesn't exist")
|
//return nil, fmt.Errorf("label index doesn't exist")
|
||||||
}
|
}
|
||||||
|
|
||||||
d1 := r.decbufAt(int(off))
|
d := r.decbufAt(int(off))
|
||||||
d2 := d1.decbuf(d1.be32int())
|
|
||||||
|
|
||||||
nc := d2.be32int()
|
nc := d.be32int()
|
||||||
d2.be32() // consume unused value entry count.
|
d.be32() // consume unused value entry count.
|
||||||
|
|
||||||
if d2.err() != nil {
|
if d.err() != nil {
|
||||||
return nil, errors.Wrap(d2.err(), "read label value index")
|
return nil, errors.Wrap(d.err(), "read label value index")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
|
|
||||||
|
|
||||||
st := &serializedStringTuples{
|
st := &serializedStringTuples{
|
||||||
l: nc,
|
l: nc,
|
||||||
b: d2.get(),
|
b: d.get(),
|
||||||
lookup: r.lookupSymbol,
|
lookup: r.lookupSymbol,
|
||||||
}
|
}
|
||||||
return st, nil
|
return st, nil
|
||||||
|
@ -785,20 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
|
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
|
||||||
d1 := r.decbufAt(int(ref))
|
d := r.decbufUvarintAt(int(ref))
|
||||||
d2 := d1.decbuf(int(d1.uvarint()))
|
|
||||||
|
|
||||||
*lbls = (*lbls)[:0]
|
*lbls = (*lbls)[:0]
|
||||||
*chks = (*chks)[:0]
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
k := int(d2.uvarint())
|
k := int(d.uvarint())
|
||||||
|
|
||||||
for i := 0; i < k; i++ {
|
for i := 0; i < k; i++ {
|
||||||
lno := uint32(d2.uvarint())
|
lno := uint32(d.uvarint())
|
||||||
lvo := uint32(d2.uvarint())
|
lvo := uint32(d.uvarint())
|
||||||
|
|
||||||
if d2.err() != nil {
|
if d.err() != nil {
|
||||||
return errors.Wrap(d2.err(), "read series label offsets")
|
return errors.Wrap(d.err(), "read series label offsets")
|
||||||
}
|
}
|
||||||
|
|
||||||
ln, err := r.lookupSymbol(lno)
|
ln, err := r.lookupSymbol(lno)
|
||||||
|
@ -814,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the chunks meta data.
|
// Read the chunks meta data.
|
||||||
k = int(d2.uvarint())
|
k = int(d.uvarint())
|
||||||
|
|
||||||
if k == 0 {
|
if k == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
t0 := d2.varint64()
|
t0 := d.varint64()
|
||||||
maxt := int64(d2.uvarint64()) + t0
|
maxt := int64(d.uvarint64()) + t0
|
||||||
ref0 := int64(d2.uvarint64())
|
ref0 := int64(d.uvarint64())
|
||||||
|
|
||||||
*chks = append(*chks, ChunkMeta{
|
*chks = append(*chks, ChunkMeta{
|
||||||
Ref: uint64(ref0),
|
Ref: uint64(ref0),
|
||||||
|
@ -832,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
|
||||||
t0 = maxt
|
t0 = maxt
|
||||||
|
|
||||||
for i := 1; i < k; i++ {
|
for i := 1; i < k; i++ {
|
||||||
mint := int64(d2.uvarint64()) + t0
|
mint := int64(d.uvarint64()) + t0
|
||||||
maxt := int64(d2.uvarint64()) + mint
|
maxt := int64(d.uvarint64()) + mint
|
||||||
|
|
||||||
ref0 += d2.varint64()
|
ref0 += d.varint64()
|
||||||
t0 = maxt
|
t0 = maxt
|
||||||
|
|
||||||
if d2.err() != nil {
|
if d.err() != nil {
|
||||||
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
*chks = append(*chks, ChunkMeta{
|
*chks = append(*chks, ChunkMeta{
|
||||||
|
@ -848,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
return d.err()
|
||||||
// TODO(fabxc): verify CRC32.
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
|
@ -862,19 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return emptyPostings, nil
|
return emptyPostings, nil
|
||||||
}
|
}
|
||||||
|
d := r.decbufAt(int(off))
|
||||||
|
d.be32() // consume unused postings list length.
|
||||||
|
|
||||||
d1 := r.decbufAt(int(off))
|
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
|
||||||
d2 := d1.decbuf(d1.be32int())
|
|
||||||
|
|
||||||
d2.be32() // consume unused postings list length.
|
|
||||||
|
|
||||||
if d2.err() != nil {
|
|
||||||
return nil, errors.Wrap(d2.err(), "get postings bytes")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
|
|
||||||
|
|
||||||
return newBigEndianPostings(d2.get()), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) SortedPostings(p Postings) Postings {
|
func (r *indexReader) SortedPostings(p Postings) Postings {
|
||||||
|
|
|
@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err }
|
||||||
|
|
||||||
var emptyPostings = errPostings{}
|
var emptyPostings = errPostings{}
|
||||||
|
|
||||||
|
// EmptyPostings returns a postings list that's always empty.
|
||||||
|
func EmptyPostings() Postings {
|
||||||
|
return emptyPostings
|
||||||
|
}
|
||||||
|
|
||||||
// Intersect returns a new postings list over the intersection of the
|
// Intersect returns a new postings list over the intersection of the
|
||||||
// input postings.
|
// input postings.
|
||||||
func Intersect(its ...Postings) Postings {
|
func Intersect(its ...Postings) Postings {
|
||||||
|
|
|
@ -27,7 +27,7 @@ import (
|
||||||
// time range.
|
// time range.
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
// Select returns a set of series that matches the given label matchers.
|
// Select returns a set of series that matches the given label matchers.
|
||||||
Select(...labels.Matcher) SeriesSet
|
Select(...labels.Matcher) (SeriesSet, error)
|
||||||
|
|
||||||
// LabelValues returns all potential values for a label name.
|
// LabelValues returns all potential values for a label name.
|
||||||
LabelValues(string) ([]string, error)
|
LabelValues(string) ([]string, error)
|
||||||
|
@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
return nil, fmt.Errorf("not implemented")
|
return nil, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
|
func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
|
||||||
return q.sel(q.blocks, ms)
|
return q.sel(q.blocks, ms)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet {
|
func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
|
||||||
if len(qs) == 0 {
|
if len(qs) == 0 {
|
||||||
return nopSeriesSet{}
|
return EmptySeriesSet(), nil
|
||||||
}
|
}
|
||||||
if len(qs) == 1 {
|
if len(qs) == 1 {
|
||||||
return qs[0].Select(ms...)
|
return qs[0].Select(ms...)
|
||||||
}
|
}
|
||||||
l := len(qs) / 2
|
l := len(qs) / 2
|
||||||
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
|
|
||||||
|
a, err := q.sel(qs[:l], ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
b, err := q.sel(qs[l:], ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newMergedSeriesSet(a, b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Close() error {
|
func (q *querier) Close() error {
|
||||||
|
@ -141,20 +150,14 @@ type blockQuerier struct {
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
|
||||||
pr := newPostingsReader(q.index)
|
base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
|
||||||
|
if err != nil {
|
||||||
p, absent := pr.Select(ms...)
|
return nil, err
|
||||||
|
}
|
||||||
return &blockSeriesSet{
|
return &blockSeriesSet{
|
||||||
set: &populatedChunkSeries{
|
set: &populatedChunkSeries{
|
||||||
set: &baseChunkSeries{
|
set: base,
|
||||||
p: p,
|
|
||||||
index: q.index,
|
|
||||||
absent: absent,
|
|
||||||
|
|
||||||
tombstones: q.tombstones,
|
|
||||||
},
|
|
||||||
chunks: q.chunks,
|
chunks: q.chunks,
|
||||||
mint: q.mint,
|
mint: q.mint,
|
||||||
maxt: q.maxt,
|
maxt: q.maxt,
|
||||||
|
@ -162,7 +165,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
|
|
||||||
mint: q.mint,
|
mint: q.mint,
|
||||||
maxt: q.maxt,
|
maxt: q.maxt,
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
|
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
|
||||||
|
@ -196,16 +199,10 @@ func (q *blockQuerier) Close() error {
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// postingsReader is used to select matching postings from an IndexReader.
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||||
type postingsReader struct {
|
// based on the given matchers. It returns a list of label names that must be manually
|
||||||
index IndexReader
|
// checked to not exist in series the postings list points to.
|
||||||
}
|
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
|
||||||
|
|
||||||
func newPostingsReader(i IndexReader) *postingsReader {
|
|
||||||
return &postingsReader{index: i}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
|
|
||||||
var (
|
var (
|
||||||
its []Postings
|
its []Postings
|
||||||
absent []string
|
absent []string
|
||||||
|
@ -217,12 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
|
||||||
absent = append(absent, m.Name())
|
absent = append(absent, m.Name())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
its = append(its, r.selectSingle(m))
|
it, err := postingsForMatcher(index, m)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
its = append(its, it)
|
||||||
}
|
}
|
||||||
|
return index.SortedPostings(Intersect(its...)), absent, nil
|
||||||
p := Intersect(its...)
|
|
||||||
|
|
||||||
return r.index.SortedPostings(p), absent
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
||||||
|
@ -256,33 +254,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
|
||||||
return matches, nil
|
return matches, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
|
||||||
// Fast-path for equal matching.
|
// Fast-path for equal matching.
|
||||||
if em, ok := m.(*labels.EqualMatcher); ok {
|
if em, ok := m.(*labels.EqualMatcher); ok {
|
||||||
it, err := r.index.Postings(em.Name(), em.Value())
|
it, err := index.Postings(em.Name(), em.Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
return it
|
return it, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tpls, err := r.index.LabelValues(m.Name())
|
tpls, err := index.LabelValues(m.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var res []string
|
var res []string
|
||||||
if pm, ok := m.(*labels.PrefixMatcher); ok {
|
if pm, ok := m.(*labels.PrefixMatcher); ok {
|
||||||
res, err = tuplesByPrefix(pm, tpls)
|
res, err = tuplesByPrefix(pm, tpls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
for i := 0; i < tpls.Len(); i++ {
|
for i := 0; i < tpls.Len(); i++ {
|
||||||
vals, err := tpls.At(i)
|
vals, err := tpls.At(i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
if m.Matches(vals[0]) {
|
if m.Matches(vals[0]) {
|
||||||
res = append(res, vals[0])
|
res = append(res, vals[0])
|
||||||
|
@ -291,20 +289,20 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(res) == 0 {
|
if len(res) == 0 {
|
||||||
return emptyPostings
|
return EmptyPostings(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var rit []Postings
|
var rit []Postings
|
||||||
|
|
||||||
for _, v := range res {
|
for _, v := range res {
|
||||||
it, err := r.index.Postings(m.Name(), v)
|
it, err := index.Postings(m.Name(), v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errPostings{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
rit = append(rit, it)
|
rit = append(rit, it)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Merge(rit...)
|
return Merge(rit...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeStrings(a, b []string) []string {
|
func mergeStrings(a, b []string) []string {
|
||||||
|
@ -342,11 +340,12 @@ type SeriesSet interface {
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type nopSeriesSet struct{}
|
var emptySeriesSet = errSeriesSet{}
|
||||||
|
|
||||||
func (nopSeriesSet) Next() bool { return false }
|
// EmptySeriesSet returns a series set that's always empty.
|
||||||
func (nopSeriesSet) At() Series { return nil }
|
func EmptySeriesSet() SeriesSet {
|
||||||
func (nopSeriesSet) Err() error { return nil }
|
return emptySeriesSet
|
||||||
|
}
|
||||||
|
|
||||||
// mergedSeriesSet takes two series sets as a single series set. The input series sets
|
// mergedSeriesSet takes two series sets as a single series set. The input series sets
|
||||||
// must be sorted and sequential in time, i.e. if they have the same label set,
|
// must be sorted and sequential in time, i.e. if they have the same label set,
|
||||||
|
@ -418,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
type chunkSeriesSet interface {
|
type ChunkSeriesSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []ChunkMeta, Intervals)
|
At() (labels.Labels, []ChunkMeta, Intervals)
|
||||||
Err() error
|
Err() error
|
||||||
|
@ -438,6 +437,24 @@ type baseChunkSeries struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
|
||||||
|
// over them. It drops chunks based on tombstones in the given reader.
|
||||||
|
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
|
||||||
|
if tr == nil {
|
||||||
|
tr = EmptyTombstoneReader()
|
||||||
|
}
|
||||||
|
p, absent, err := PostingsForMatchers(ir, ms...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &baseChunkSeries{
|
||||||
|
p: p,
|
||||||
|
index: ir,
|
||||||
|
tombstones: tr,
|
||||||
|
absent: absent,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
|
@ -448,6 +465,7 @@ func (s *baseChunkSeries) Next() bool {
|
||||||
var (
|
var (
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chunks []ChunkMeta
|
chunks []ChunkMeta
|
||||||
|
err error
|
||||||
)
|
)
|
||||||
Outer:
|
Outer:
|
||||||
for s.p.Next() {
|
for s.p.Next() {
|
||||||
|
@ -470,7 +488,11 @@ Outer:
|
||||||
|
|
||||||
s.lset = lset
|
s.lset = lset
|
||||||
s.chks = chunks
|
s.chks = chunks
|
||||||
s.intervals = s.tombstones.Get(s.p.At())
|
s.intervals, err = s.tombstones.Get(s.p.At())
|
||||||
|
if err != nil {
|
||||||
|
s.err = errors.Wrap(err, "get tombstones")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if len(s.intervals) > 0 {
|
if len(s.intervals) > 0 {
|
||||||
// Only those chunks that are not entirely deleted.
|
// Only those chunks that are not entirely deleted.
|
||||||
|
@ -496,7 +518,7 @@ Outer:
|
||||||
// with known chunk references. It filters out chunks that do not fit the
|
// with known chunk references. It filters out chunks that do not fit the
|
||||||
// given time range.
|
// given time range.
|
||||||
type populatedChunkSeries struct {
|
type populatedChunkSeries struct {
|
||||||
set chunkSeriesSet
|
set ChunkSeriesSet
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
||||||
|
@ -553,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool {
|
||||||
|
|
||||||
// blockSeriesSet is a set of series from an inverted index query.
|
// blockSeriesSet is a set of series from an inverted index query.
|
||||||
type blockSeriesSet struct {
|
type blockSeriesSet struct {
|
||||||
set chunkSeriesSet
|
set ChunkSeriesSet
|
||||||
err error
|
err error
|
||||||
cur Series
|
cur Series
|
||||||
|
|
||||||
|
|
|
@ -35,12 +35,17 @@ const (
|
||||||
|
|
||||||
// TombstoneReader gives access to tombstone intervals by series reference.
|
// TombstoneReader gives access to tombstone intervals by series reference.
|
||||||
type TombstoneReader interface {
|
type TombstoneReader interface {
|
||||||
Get(ref uint64) Intervals
|
// Get returns deletion intervals for the series with the given reference.
|
||||||
|
Get(ref uint64) (Intervals, error)
|
||||||
|
|
||||||
|
// Iter calls the given function for each encountered interval.
|
||||||
|
Iter(func(uint64, Intervals) error) error
|
||||||
|
|
||||||
|
// Close any underlying resources
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
func writeTombstoneFile(dir string, tr TombstoneReader) error {
|
||||||
path := filepath.Join(dir, tombstoneFilename)
|
path := filepath.Join(dir, tombstoneFilename)
|
||||||
tmp := path + ".tmp"
|
tmp := path + ".tmp"
|
||||||
hash := newCRC32()
|
hash := newCRC32()
|
||||||
|
@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||||
|
|
||||||
mw := io.MultiWriter(f, hash)
|
mw := io.MultiWriter(f, hash)
|
||||||
|
|
||||||
for k, v := range tr {
|
tr.Iter(func(ref uint64, ivs Intervals) error {
|
||||||
for _, itv := range v {
|
for _, iv := range ivs {
|
||||||
buf.reset()
|
buf.reset()
|
||||||
buf.putUvarint64(k)
|
|
||||||
buf.putVarint64(itv.Mint)
|
buf.putUvarint64(ref)
|
||||||
buf.putVarint64(itv.Maxt)
|
buf.putVarint64(iv.Mint)
|
||||||
|
buf.putVarint64(iv.Maxt)
|
||||||
|
|
||||||
_, err = mw.Write(buf.get())
|
_, err = mw.Write(buf.get())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
_, err = f.Write(hash.Sum(nil))
|
_, err = f.Write(hash.Sum(nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -100,7 +107,7 @@ type Stone struct {
|
||||||
intervals Intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func readTombstones(dir string) (tombstoneReader, error) {
|
func readTombstones(dir string) (memTombstones, error) {
|
||||||
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) {
|
||||||
return nil, errors.New("checksum did not match")
|
return nil, errors.New("checksum did not match")
|
||||||
}
|
}
|
||||||
|
|
||||||
stonesMap := newEmptyTombstoneReader()
|
stonesMap := memTombstones{}
|
||||||
|
|
||||||
for d.len() > 0 {
|
for d.len() > 0 {
|
||||||
k := d.uvarint64()
|
k := d.uvarint64()
|
||||||
mint := d.varint64()
|
mint := d.varint64()
|
||||||
|
@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) {
|
||||||
stonesMap.add(k, Interval{mint, maxt})
|
stonesMap.add(k, Interval{mint, maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
return newTombstoneReader(stonesMap), nil
|
return stonesMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type tombstoneReader map[uint64]Intervals
|
type memTombstones map[uint64]Intervals
|
||||||
|
|
||||||
func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader {
|
var emptyTombstoneReader = memTombstones{}
|
||||||
return tombstoneReader(ts)
|
|
||||||
|
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
|
||||||
|
func EmptyTombstoneReader() TombstoneReader {
|
||||||
|
return emptyTombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEmptyTombstoneReader() tombstoneReader {
|
func (t memTombstones) Get(ref uint64) (Intervals, error) {
|
||||||
return tombstoneReader(make(map[uint64]Intervals))
|
return t[ref], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t tombstoneReader) Get(ref uint64) Intervals {
|
func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
|
||||||
return t[ref]
|
for ref, ivs := range t {
|
||||||
|
if err := f(ref, ivs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t tombstoneReader) add(ref uint64, itv Interval) {
|
func (t memTombstones) add(ref uint64, itv Interval) {
|
||||||
t[ref] = t[ref].add(itv)
|
t[ref] = t[ref].add(itv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tombstoneReader) Close() error {
|
func (memTombstones) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Binary file not shown.
|
@ -63,11 +63,11 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
||||||
m := &walMetrics{}
|
m := &walMetrics{}
|
||||||
|
|
||||||
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
Name: "tsdb_wal_fsync_duration_seconds",
|
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
|
||||||
Help: "Duration of WAL fsync.",
|
Help: "Duration of WAL fsync.",
|
||||||
})
|
})
|
||||||
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
|
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "tsdb_wal_corruptions_total",
|
Name: "prometheus_tsdb_wal_corruptions_total",
|
||||||
Help: "Total number of WAL corruptions.",
|
Help: "Total number of WAL corruptions.",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -782,28 +782,28 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Bty/r75M8kM+GA80eMM5p0cLTi0=",
|
"checksumSHA1": "c3VEi8SL0XmI6BmokMOWrSWmNu8=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "706602daed1487f7849990678b4ece4599745905",
|
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
|
||||||
"revisionTime": "2017-11-04T07:45:56Z"
|
"revisionTime": "2017-11-23T17:41:24Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=",
|
"checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "706602daed1487f7849990678b4ece4599745905",
|
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
|
||||||
"revisionTime": "2017-11-04T07:45:56Z"
|
"revisionTime": "2017-11-23T17:41:24Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
|
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
|
||||||
"path": "github.com/prometheus/tsdb/fileutil",
|
"path": "github.com/prometheus/tsdb/fileutil",
|
||||||
"revision": "706602daed1487f7849990678b4ece4599745905",
|
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
|
||||||
"revisionTime": "2017-11-04T07:45:56Z"
|
"revisionTime": "2017-11-23T17:41:24Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "706602daed1487f7849990678b4ece4599745905",
|
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a",
|
||||||
"revisionTime": "2017-11-04T07:45:56Z"
|
"revisionTime": "2017-11-23T17:41:24Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in New Issue