Merge branch 'locks' of github.com:prometheus/tsdb into locks

pull/5805/head
Fabian Reinartz 7 years ago
commit 3d8be398d6

@ -17,7 +17,6 @@ import (
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"time" "time"
@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
c.metrics.ran.Inc() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds()) c.metrics.duration.Observe(time.Since(t).Seconds())
// We might have done quite a few allocs. Enforce a GC so they do not accumulate
// with subsequent compactions or head GCs.
runtime.GC()
}(time.Now()) }(time.Now())
dir := filepath.Join(dest, meta.ULID.String()) dir := filepath.Join(dest, meta.ULID.String())
@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }
} }
// Write a postings list containing all series.
all := make([]uint64, i)
for i := range all {
all[i] = uint64(i)
}
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return errors.Wrap(err, "write 'all' postings")
}
return nil return nil
} }

@ -21,6 +21,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) {
} }
changes = true changes = true
runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks") return changes, errors.Wrap(err, "reload blocks")
} }
runtime.GC()
} }
// Check for compactions of multiple blocks. // Check for compactions of multiple blocks.
@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) {
return changes, errors.Wrap(err, "delete compacted block") return changes, errors.Wrap(err, "delete compacted block")
} }
} }
runtime.GC()
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks") return changes, errors.Wrap(err, "reload blocks")
} }
runtime.GC()
} }
return changes, nil return changes, nil

@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// uvarintTempStr decodes like uvarintStr but the returned string is
// not safe to use if the underyling buffer changes.
func (d *decbuf) uvarintTempStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := yoloString(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) uvarintStr() string { func (d *decbuf) uvarintStr() string {
l := d.uvarint64() l := d.uvarint64()
if d.e != nil { if d.e != nil {

@ -15,7 +15,6 @@ package tsdb
import ( import (
"math" "math"
"runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -515,8 +514,6 @@ Outer:
// gc removes data before the minimum timestmap from the head. // gc removes data before the minimum timestmap from the head.
func (h *Head) gc() { func (h *Head) gc() {
defer runtime.GC()
// Only data strictly lower than this timestamp must be deleted. // Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime() mint := h.MinTime()

@ -335,10 +335,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
for _, s := range symbols { for _, s := range symbols {
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
// NOTE: len(s) gives the number of runes, not the number of bytes.
// Therefore the read-back length for strings with unicode characters will
// be off when not using putUvarintStr.
w.buf2.putUvarintStr(s) w.buf2.putUvarintStr(s)
} }
@ -636,7 +632,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
keys := make([]string, 0, keyCount) keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr()) keys = append(keys, d2.uvarintTempStr())
} }
res[strings.Join(keys, sep)] = uint32(d2.uvarint()) res[strings.Join(keys, sep)] = uint32(d2.uvarint())
@ -673,7 +669,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
func (r *indexReader) lookupSymbol(o uint32) (string, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) {
d := r.decbufAt(int(o)) d := r.decbufAt(int(o))
s := d.uvarintStr() s := d.uvarintTempStr()
if d.err() != nil { if d.err() != nil {
return "", errors.Wrapf(d.err(), "read symbol at %d", o) return "", errors.Wrapf(d.err(), "read symbol at %d", o)
} }
@ -688,7 +684,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) {
sym := make(map[string]struct{}, count) sym := make(map[string]struct{}, count)
for ; count > 0; count-- { for ; count > 0; count-- {
s := d2.uvarintStr() s := d2.uvarintTempStr()
sym[s] = struct{}{} sym[s] = struct{}{}
} }

@ -399,6 +399,9 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, w.Close()) require.NoError(t, w.Close())
// cut() truncates and fsyncs the first segment async. If it happens after
// the corruption we apply below, the corruption will be overwritten again.
// Fire and forget a sync to avoid flakyness.
w.files[0].Sync() w.files[0].Sync()
// Corrupt the second entry in the first file. // Corrupt the second entry in the first file.
// After re-opening we must be able to read the first entry // After re-opening we must be able to read the first entry

Loading…
Cancel
Save