mirror of https://github.com/prometheus/prometheus
Fix compaction selection after creating new heads
This fixes the case where between block creations no compaction plans are ran. We were not compacting anything in these cases since the on creation the most recent head block always had a high timestamp of 0.pull/5805/head
parent
5d7b5994d6
commit
9c4235532e
|
@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
||||||
WALFlushInterval: 200 * time.Millisecond,
|
WALFlushInterval: 200 * time.Millisecond,
|
||||||
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
|
BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitWithError(err)
|
exitWithError(err)
|
||||||
|
@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
fmt.Println("ingestion completed")
|
||||||
|
|
||||||
return total, nil
|
return total, nil
|
||||||
}
|
}
|
||||||
|
|
79
db.go
79
db.go
|
@ -325,37 +325,52 @@ func headFullness(h headBlock) float64 {
|
||||||
return a / b
|
return a / b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to.
|
||||||
|
func (db *DB) appendableHeads() (r []headBlock) {
|
||||||
|
switch l := len(db.heads); l {
|
||||||
|
case 0:
|
||||||
|
case 1:
|
||||||
|
r = append(r, db.heads[0])
|
||||||
|
default:
|
||||||
|
if headFullness(db.heads[l-1]) < 0.5 {
|
||||||
|
r = append(r, db.heads[l-2])
|
||||||
|
}
|
||||||
|
r = append(r, db.heads[l-1])
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) completedHeads() (r []headBlock) {
|
||||||
|
db.headmtx.RLock()
|
||||||
|
defer db.headmtx.RUnlock()
|
||||||
|
|
||||||
|
if len(db.heads) < 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select all old heads unless they still have pending appenders.
|
||||||
|
for _, h := range db.heads[:len(db.heads)-2] {
|
||||||
|
if h.ActiveWriters() > 0 {
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
r = append(r, h)
|
||||||
|
}
|
||||||
|
// Add the 2nd last head if the last head is more than 50% filled.
|
||||||
|
// Compacting it early allows us to free its memory before allocating
|
||||||
|
// more for the next block and thus reduces spikes.
|
||||||
|
if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 {
|
||||||
|
r = append(r, h2)
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) compact() (changes bool, err error) {
|
func (db *DB) compact() (changes bool, err error) {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.headmtx.RLock()
|
|
||||||
|
|
||||||
// Check whether we have pending head blocks that are ready to be persisted.
|
// Check whether we have pending head blocks that are ready to be persisted.
|
||||||
// They have the highest priority.
|
// They have the highest priority.
|
||||||
var singles []Block
|
for _, h := range db.completedHeads() {
|
||||||
|
|
||||||
// Collect head blocks that are ready for compaction. Write them after
|
|
||||||
// returning the lock to not block Appenders.
|
|
||||||
// Selected blocks are semantically ensured to not be written to afterwards
|
|
||||||
// by appendable().
|
|
||||||
if len(db.heads) > 1 {
|
|
||||||
f := headFullness(db.heads[len(db.heads)-1])
|
|
||||||
|
|
||||||
for _, h := range db.heads[:len(db.heads)-1] {
|
|
||||||
// Blocks that won't be appendable when instantiating a new appender
|
|
||||||
// might still have active appenders on them.
|
|
||||||
// Abort at the first one we encounter.
|
|
||||||
if h.ActiveWriters() > 0 || f < 0.5 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
singles = append(singles, h)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
db.headmtx.RUnlock()
|
|
||||||
|
|
||||||
for _, h := range singles {
|
|
||||||
select {
|
select {
|
||||||
case <-db.stopc:
|
case <-db.stopc:
|
||||||
return changes, nil
|
return changes, nil
|
||||||
|
@ -677,7 +692,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var hb headBlock
|
var hb headBlock
|
||||||
for _, h := range a.db.appendable() {
|
for _, h := range a.db.appendableHeads() {
|
||||||
m := h.Meta()
|
m := h.Meta()
|
||||||
|
|
||||||
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
if intervalContains(m.MinTime, m.MaxTime-1, t) {
|
||||||
|
@ -809,18 +824,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
|
||||||
func (db *DB) appendable() (r []headBlock) {
|
|
||||||
switch len(db.heads) {
|
|
||||||
case 0:
|
|
||||||
case 1:
|
|
||||||
r = append(r, db.heads[0])
|
|
||||||
default:
|
|
||||||
r = append(r, db.heads[len(db.heads)-2:]...)
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||||
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
||||||
return amin <= bmax && bmin <= amax
|
return amin <= bmax && bmin <= amax
|
||||||
|
|
Loading…
Reference in New Issue