vendor: update TSDB

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
pull/3523/head
Goutham Veeramachaneni 2017-11-30 16:04:53 +05:30
parent a1be5db99f
commit 9f92a71053
9 changed files with 146 additions and 42 deletions

View File

@ -77,6 +77,7 @@ type BlockMetaCompaction struct {
Level int `json:"level"` Level int `json:"level"`
// ULIDs of all source head blocks that went into the block. // ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"` Sources []ulid.ULID `json:"sources,omitempty"`
Failed bool `json:"failed,omitempty"`
} }
const ( const (
@ -244,6 +245,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) {
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
} }
func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta)
}
type blockIndexReader struct { type blockIndexReader struct {
IndexReader IndexReader
b *Block b *Block
@ -341,6 +347,30 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.dir, &pb.meta)
} }
// CleanTombstones will rewrite the block if there any tombstones to remove them
// and returns if there was a re-write.
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
numStones := 0
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _ = range ivs {
numStones++
}
return nil
})
if numStones == 0 {
return false, nil
}
if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
return false, err
}
return true, nil
}
// Snapshot creates snapshot of the block into dir. // Snapshot creates snapshot of the block into dir.
func (pb *Block) Snapshot(dir string) error { func (pb *Block) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String()) blockDir := filepath.Join(dir, pb.meta.ULID.String())

View File

@ -1,12 +0,0 @@
--- contention:
cycles/second=2494255279
80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1
37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1
21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1
851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1
818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1
7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1
3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1

View File

@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
continue continue
} }
Outer:
for _, p := range parts { for _, p := range parts {
// Donot select the range if it has a block whose compaction failed.
for _, dm := range p {
if dm.meta.Compaction.Failed {
continue Outer
}
}
mint := p[0].meta.MinTime mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps) // Pick the range of blocks if it spans the full range (potentially with gaps)
@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var blocks []BlockReader var blocks []BlockReader
var bs []*Block
var metas []*BlockMeta var metas []*BlockMeta
for _, d := range dirs { for _, d := range dirs {
@ -313,12 +322,27 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
metas = append(metas, meta) metas = append(metas, meta)
blocks = append(blocks, b) blocks = append(blocks, b)
bs = append(bs, b)
} }
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(dest, compactBlockMetas(uid, metas...), blocks...) err = c.write(dest, compactBlockMetas(uid, metas...), blocks...)
if err == nil {
return nil
}
var merr MultiError
merr.Add(err)
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
}
}
return merr
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
defer func(t time.Time) { defer func(t time.Time) {
if err != nil { if err != nil {
c.metrics.failed.Inc() c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how?
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
} }
c.metrics.ran.Inc() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds()) c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now()) }(time.Now())
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil { if err = os.RemoveAll(tmp); err != nil {
return err return err
} }
@ -525,22 +553,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 { if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values. // Re-encode the chunk to not have deleted values.
for _, chk := range chks { for i, chk := range chks {
if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
newChunk := chunks.NewXORChunk() continue
app, err := newChunk.Appender()
if err != nil {
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
chk.Chunk = newChunk
} }
newChunk := chunks.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
}
chks[i].Chunk = newChunk
} }
} }
if err := chunkw.WriteChunks(chks...); err != nil { if err := chunkw.WriteChunks(chks...); err != nil {
@ -589,7 +619,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
} }
for l := range postings.m { for _, l := range postings.sortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }

View File

View File

View File

@ -122,6 +122,7 @@ type dbMetrics struct {
reloads prometheus.Counter reloads prometheus.Counter
reloadsFailed prometheus.Counter reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter compactionsTriggered prometheus.Counter
tombCleanTimer prometheus.Histogram
} }
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -147,6 +148,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.", Help: "Total number of triggered compactions for the partition.",
}) })
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
})
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -154,6 +159,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.reloads, m.reloads,
m.reloadsFailed, m.reloadsFailed,
m.compactionsTriggered, m.compactionsTriggered,
m.tombCleanTimer,
) )
} }
return m return m
@ -616,7 +622,7 @@ func (db *DB) Snapshot(dir string) error {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b) level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil { if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock") return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
} }
} }
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
@ -691,6 +697,37 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return nil return nil
} }
// CleanTombstones re-writes any blocks with tombstones.
func (db *DB) CleanTombstones() error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
start := time.Now()
defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds()))
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
deleted := []string{}
for _, b := range blocks {
ok, err := b.CleanTombstones(db.Dir(), db.compactor)
if err != nil {
return errors.Wrapf(err, "clean tombstones: %s", b.Dir())
}
if ok {
deleted = append(deleted, b.Dir())
}
}
if len(deleted) == 0 {
return nil
}
return errors.Wrap(db.reload(deleted...), "reload blocks")
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/ // Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax return amin <= bmax && bmin <= amax

View File

@ -50,6 +50,25 @@ func newUnorderedMemPostings() *memPostings {
} }
} }
// sortedKeys returns a list of sorted label keys of the postings.
func (p *memPostings) sortedKeys() []labels.Label {
p.mtx.RLock()
keys := make([]labels.Label, 0, len(p.m))
for l := range p.m {
keys = append(keys, l)
}
p.mtx.RUnlock()
sort.Slice(keys, func(i, j int) bool {
if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 {
return d < 0
}
return keys[i].Value < keys[j].Value
})
return keys
}
// Postings returns an iterator over the postings list for s. // Postings returns an iterator over the postings list for s.
func (p *memPostings) get(name, value string) Postings { func (p *memPostings) get(name, value string) Postings {
p.mtx.RLock() p.mtx.RLock()

Binary file not shown.

18
vendor/vendor.json vendored
View File

@ -794,28 +794,28 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "c3VEi8SL0XmI6BmokMOWrSWmNu8=", "checksumSHA1": "XgGOJ06okm8qd+x/gdRj48RgXsg=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070",
"revisionTime": "2017-11-23T17:41:24Z" "revisionTime": "2017-11-30T09:58:01Z"
}, },
{ {
"checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=", "checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=",
"path": "github.com/prometheus/tsdb/chunks", "path": "github.com/prometheus/tsdb/chunks",
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070",
"revisionTime": "2017-11-23T17:41:24Z" "revisionTime": "2017-11-30T09:58:01Z"
}, },
{ {
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
"path": "github.com/prometheus/tsdb/fileutil", "path": "github.com/prometheus/tsdb/fileutil",
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070",
"revisionTime": "2017-11-23T17:41:24Z" "revisionTime": "2017-11-30T09:58:01Z"
}, },
{ {
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels", "path": "github.com/prometheus/tsdb/labels",
"revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070",
"revisionTime": "2017-11-23T17:41:24Z" "revisionTime": "2017-11-30T09:58:01Z"
}, },
{ {
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",