diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 8456cb376..4cb3c048f 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -77,6 +77,7 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + Failed bool `json:"failed,omitempty"` } const ( @@ -244,6 +245,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +func (pb *Block) setCompactionFailed() error { + pb.meta.Compaction.Failed = true + return writeMetaFile(pb.dir, &pb.meta) +} + type blockIndexReader struct { IndexReader b *Block @@ -341,6 +347,30 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } +// CleanTombstones will rewrite the block if there any tombstones to remove them +// and returns if there was a re-write. +func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { + numStones := 0 + + pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _ = range ivs { + numStones++ + } + + return nil + }) + + if numStones == 0 { + return false, nil + } + + if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { + return false, err + } + + return true, nil +} + // Snapshot creates snapshot of the block into dir. func (pb *Block) Snapshot(dir string) error { blockDir := filepath.Join(dir, pb.meta.ULID.String()) diff --git a/vendor/github.com/prometheus/tsdb/block.prof b/vendor/github.com/prometheus/tsdb/block.prof deleted file mode 100644 index 122ef97f3..000000000 --- a/vendor/github.com/prometheus/tsdb/block.prof +++ /dev/null @@ -1,12 +0,0 @@ ---- contention: -cycles/second=2494255279 -80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1 -80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1 -37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1 -21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1 -1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1 -851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1 -818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1 -501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1 -7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1 -3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1 diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 35cb36a63..5bb328aa3 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { continue } + Outer: for _, p := range parts { + // Donot select the range if it has a block whose compaction failed. + for _, dm := range p { + if dm.meta.Compaction.Failed { + continue Outer + } + } + mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) @@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []BlockReader + var bs []*Block var metas []*BlockMeta for _, d := range dirs { @@ -313,12 +322,27 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { metas = append(metas, meta) blocks = append(blocks, b) + bs = append(bs, b) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, compactBlockMetas(uid, metas...), blocks...) + err = c.write(dest, compactBlockMetas(uid, metas...), blocks...) + if err == nil { + return nil + } + + var merr MultiError + merr.Add(err) + + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } + } + + return merr } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { @@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) + dir := filepath.Join(dest, meta.ULID.String()) + tmp := dir + ".tmp" + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() + // TODO(gouthamve): Handle error how? + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + ".tmp" - if err = os.RemoveAll(tmp); err != nil { return err } @@ -525,22 +553,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. - for _, chk := range chks { - if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { - newChunk := chunks.NewXORChunk() - app, err := newChunk.Appender() - if err != nil { - return err - } - - it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} - for it.Next() { - ts, v := it.At() - app.Append(ts, v) - } - - chk.Chunk = newChunk + for i, chk := range chks { + if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { + continue } + + newChunk := chunks.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + return err + } + + it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + for it.Next() { + ts, v := it.At() + app.Append(ts, v) + } + + chks[i].Chunk = newChunk } } if err := chunkw.WriteChunks(chks...); err != nil { @@ -589,7 +619,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for l := range postings.m { + for _, l := range postings.sortedKeys() { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } diff --git a/vendor/github.com/prometheus/tsdb/cpu.pro b/vendor/github.com/prometheus/tsdb/cpu.pro deleted file mode 100644 index e69de29bb..000000000 diff --git a/vendor/github.com/prometheus/tsdb/cpu.prof b/vendor/github.com/prometheus/tsdb/cpu.prof deleted file mode 100644 index e69de29bb..000000000 diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index a748a6e1f..eeb81e8d6 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -122,6 +122,7 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + tombCleanTimer prometheus.Histogram } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -147,6 +148,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "prometheus_tsdb_tombstone_cleanup_seconds", + Help: "The time taken to recompact blocks to remove tombstones.", + }) if r != nil { r.MustRegister( @@ -154,6 +159,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.reloads, m.reloadsFailed, m.compactionsTriggered, + m.tombCleanTimer, ) } return m @@ -616,7 +622,7 @@ func (db *DB) Snapshot(dir string) error { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { - return errors.Wrap(err, "error snapshotting headblock") + return errors.Wrapf(err, "error snapshotting block: %s", b.Dir()) } } _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) @@ -691,6 +697,37 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } +// CleanTombstones re-writes any blocks with tombstones. +func (db *DB) CleanTombstones() error { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + start := time.Now() + defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds())) + + db.mtx.RLock() + blocks := db.blocks[:] + db.mtx.RUnlock() + + deleted := []string{} + for _, b := range blocks { + ok, err := b.CleanTombstones(db.Dir(), db.compactor) + if err != nil { + return errors.Wrapf(err, "clean tombstones: %s", b.Dir()) + } + + if ok { + deleted = append(deleted, b.Dir()) + } + } + + if len(deleted) == 0 { + return nil + } + + return errors.Wrap(db.reload(deleted...), "reload blocks") +} + func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index 63fb1e31a..1ebc7c576 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -50,6 +50,25 @@ func newUnorderedMemPostings() *memPostings { } } +// sortedKeys returns a list of sorted label keys of the postings. +func (p *memPostings) sortedKeys() []labels.Label { + p.mtx.RLock() + keys := make([]labels.Label, 0, len(p.m)) + + for l := range p.m { + keys = append(keys, l) + } + p.mtx.RUnlock() + + sort.Slice(keys, func(i, j int) bool { + if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 { + return d < 0 + } + return keys[i].Value < keys[j].Value + }) + return keys +} + // Postings returns an iterator over the postings list for s. func (p *memPostings) get(name, value string) Postings { p.mtx.RLock() diff --git a/vendor/github.com/prometheus/tsdb/tsdb.test b/vendor/github.com/prometheus/tsdb/tsdb.test deleted file mode 100755 index 0cd858d36..000000000 Binary files a/vendor/github.com/prometheus/tsdb/tsdb.test and /dev/null differ diff --git a/vendor/vendor.json b/vendor/vendor.json index 0f4a8c497..5f4e975af 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -794,28 +794,28 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "c3VEi8SL0XmI6BmokMOWrSWmNu8=", + "checksumSHA1": "XgGOJ06okm8qd+x/gdRj48RgXsg=", "path": "github.com/prometheus/tsdb", - "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", - "revisionTime": "2017-11-23T17:41:24Z" + "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070", + "revisionTime": "2017-11-30T09:58:01Z" }, { "checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", - "revisionTime": "2017-11-23T17:41:24Z" + "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070", + "revisionTime": "2017-11-30T09:58:01Z" }, { "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", - "revisionTime": "2017-11-23T17:41:24Z" + "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070", + "revisionTime": "2017-11-30T09:58:01Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", - "revisionTime": "2017-11-23T17:41:24Z" + "revision": "30bbbe34f8787df072cf04563bc98fb8094ba070", + "revisionTime": "2017-11-30T09:58:01Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",