Filter WAL data in Head, misc fixes

pull/5805/head
Fabian Reinartz 7 years ago
parent 33e9bdf403
commit 81222849bc

@ -214,6 +214,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := db.reload(); err != nil {
return nil, err
}
if err := db.head.ReadWAL(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
go db.run()

@ -164,7 +164,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
l = log.NewNopLogger()
}
if wal == nil {
wal = NopWAL{}
wal = NopWAL()
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
@ -173,7 +173,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MaxInt64,
minTime: math.MinInt64,
maxTime: math.MinInt64,
series: newStripeSeries(),
values: map[string]stringset{},
@ -183,11 +183,12 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
}
h.metrics = newHeadMetrics(h, r)
return h, h.readWAL()
return h, nil
}
func (h *Head) readWAL() error {
func (h *Head) ReadWAL() error {
r := h.wal.Reader()
mint := h.MinTime()
seriesFunc := func(series []RefSeries) error {
for _, s := range series {
@ -197,6 +198,9 @@ func (h *Head) readWAL() error {
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
if s.T < mint {
continue
}
ms := h.series.getByID(s.Ref)
if ms == nil {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
@ -213,6 +217,9 @@ func (h *Head) readWAL() error {
deletesFunc := func(stones []Stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
if itv.Maxt < mint {
continue
}
h.tombstones.add(s.ref, itv)
}
}
@ -226,12 +233,10 @@ func (h *Head) readWAL() error {
return nil
}
func (h *Head) String() string {
return "<head>"
}
// Truncate removes all data before mint from the head block and truncates its WAL.
func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MinInt64
if mint%h.chunkRange != 0 {
return errors.Errorf("truncating at %d not aligned", mint)
}
@ -240,6 +245,12 @@ func (h *Head) Truncate(mint int64) error {
}
atomic.StoreInt64(&h.minTime, mint)
// This was an initial call to Truncate after loading blocks on startup.
// We haven't read back the WAL yet, so do not attempt to truncate it.
if initialize {
return nil
}
start := time.Now()
h.gc()
@ -934,7 +945,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
rmChunks = 0
)
// Run through all series and truncate old chunks. Mark those with no
// chunks left as deleted and store their ID and hash.
// chunks left as deleted and store their ID.
for i := 0; i < stripeSize; i++ {
s.locks[i].Lock()

@ -73,15 +73,19 @@ type WAL interface {
}
// NopWAL is a WAL that does nothing.
type NopWAL struct{}
func NopWAL() WAL {
return nopWAL{}
}
type nopWAL struct{}
func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
func (w NopWAL) Reader() WALReader { return w }
func (NopWAL) LogSeries([]RefSeries) error { return nil }
func (NopWAL) LogSamples([]RefSample) error { return nil }
func (NopWAL) LogDeletes([]Stone) error { return nil }
func (NopWAL) Truncate(int64, Postings) error { return nil }
func (NopWAL) Close() error { return nil }
func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
func (w nopWAL) Reader() WALReader { return w }
func (nopWAL) LogSeries([]RefSeries) error { return nil }
func (nopWAL) LogSamples([]RefSample) error { return nil }
func (nopWAL) LogDeletes([]Stone) error { return nil }
func (nopWAL) Truncate(int64, Postings) error { return nil }
func (nopWAL) Close() error { return nil }
// WALReader reads entries from a WAL.
type WALReader interface {
@ -344,6 +348,13 @@ Loop:
return errors.Wrap(r.Err(), "read candidate WAL files")
}
off, err := csf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := csf.Truncate(off); err != nil {
return err
}
if err := csf.Close(); err != nil {
return errors.Wrap(err, "close tmp file")
}
@ -351,15 +362,19 @@ Loop:
if err := renameFile(csf.Name(), candidates[len(candidates)-1].Name()); err != nil {
return err
}
if err := w.dirFile.Sync(); err != nil {
return err
}
for _, f := range candidates[1:] {
if err := os.RemoveAll(f.Name()); err != nil {
return errors.Wrap(err, "delete WAL segment file")
}
}
if err := w.dirFile.Sync(); err != nil {
return err
}
w.mtx.Lock()
w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...)
w.mtx.Unlock()
return nil
}

Loading…
Cancel
Save