diff --git a/db.go b/db.go index d7c5649fb..dfb62ea02 100644 --- a/db.go +++ b/db.go @@ -493,6 +493,7 @@ func (db *DB) reload() (err error) { return nil } maxt := blocks[len(db.blocks)-1].Meta().MaxTime + return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } diff --git a/head.go b/head.go index 309489119..c1c25bbd4 100644 --- a/head.go +++ b/head.go @@ -193,9 +193,9 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( func (h *Head) readWAL() error { r := h.wal.Reader(h.MinTime()) - seriesFunc := func(series []labels.Labels) error { - for _, lset := range series { - h.create(lset.Hash(), lset) + seriesFunc := func(series []RefSeries) error { + for _, s := range series { + h.create(s.Labels.Hash(), s.Labels) } return nil } @@ -252,7 +252,12 @@ func (h *Head) Truncate(mint int64) error { start = time.Now() - if err := h.wal.Truncate(mint); err == nil { + p, err := h.indexRange(mint, math.MaxInt64).Postings("", "") + if err != nil { + return err + } + + if err := h.wal.Truncate(mint, p); err == nil { h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start)) } else { h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) @@ -356,9 +361,9 @@ type headAppender struct { head *Head mint int64 - newSeries []*hashedLabels - newLabels []labels.Labels - newHashes map[uint64]uint64 + newSeries []*hashedLabels + createdSeries []RefSeries + newHashes map[uint64]uint64 samples []RefSample highTimestamp int64 @@ -461,7 +466,7 @@ func (a *headAppender) createSeries() error { if len(a.newSeries) == 0 { return nil } - a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + a.createdSeries = make([]RefSeries, 0, len(a.newSeries)) base0 := len(a.head.series) a.head.mtx.RUnlock() @@ -481,14 +486,14 @@ func (a *headAppender) createSeries() error { } } // Series is still new. - a.newLabels = append(a.newLabels, l.labels) - s := a.head.create(l.hash, l.labels) l.ref = uint64(s.ref) + + a.createdSeries = append(a.createdSeries, RefSeries{Ref: l.ref, Labels: l.labels}) } // Write all new series to the WAL. - if err := a.head.wal.LogSeries(a.newLabels); err != nil { + if err := a.head.wal.LogSeries(a.createdSeries); err != nil { return errors.Wrap(err, "WAL log series") } diff --git a/wal.go b/wal.go index dcf3fc9e9..3714c8800 100644 --- a/wal.go +++ b/wal.go @@ -53,7 +53,7 @@ const ( type SamplesCB func([]RefSample) error // SeriesCB is the callback after reading series. -type SeriesCB func([]labels.Labels) error +type SeriesCB func([]RefSeries) error // DeletesCB is the callback after reading deletes. type DeletesCB func([]Stone) error @@ -63,7 +63,7 @@ type SegmentWAL struct { mtx sync.Mutex dirFile *os.File - files []*os.File + files []*segmentFile logger log.Logger flushInterval time.Duration @@ -73,6 +73,10 @@ type SegmentWAL struct { cur *bufio.Writer curN int64 + // The max time of samples committed last/being committed. Not global or current + // segment values. + maxt int64 + stopc chan struct{} donec chan struct{} } @@ -81,10 +85,10 @@ type SegmentWAL struct { // It must be completely read before new entries are logged. type WAL interface { Reader(mint int64) WALReader - LogSeries([]labels.Labels) error + LogSeries([]RefSeries) error LogSamples([]RefSample) error LogDeletes([]Stone) error - Truncate(maxt int64) error + Truncate(int64, Postings) error Close() error } @@ -92,10 +96,10 @@ type NopWAL struct{} func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } func (w NopWAL) Reader(int64) WALReader { return w } -func (NopWAL) LogSeries([]labels.Labels) error { return nil } +func (NopWAL) LogSeries([]RefSeries) error { return nil } func (NopWAL) LogSamples([]RefSample) error { return nil } func (NopWAL) LogDeletes([]Stone) error { return nil } -func (NopWAL) Truncate(int64) error { return nil } +func (NopWAL) Truncate(int64, Postings) error { return nil } func (NopWAL) Close() error { return nil } // WALReader reads entries from a WAL. @@ -103,6 +107,12 @@ type WALReader interface { Read(SeriesCB, SamplesCB, DeletesCB) error } +// RefSeries is the series labels with the series ID. +type RefSeries struct { + Ref uint64 + Labels labels.Labels +} + // RefSample is a timestamp/value pair associated with a reference to a series. type RefSample struct { Ref uint64 @@ -110,6 +120,15 @@ type RefSample struct { V float64 } +type segmentFile struct { + f *os.File + maxt int64 +} + +func (f segmentFile) Close() error { + return f.f.Close() +} + const ( walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) @@ -163,17 +182,102 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *SegmentWAL) Reader(int64) WALReader { - return newWALReader(w, w.logger) +func (w *SegmentWAL) Reader(mint int64) WALReader { + return newWALReader(w, mint, w.logger) } -// Log writes a batch of new series labels and samples to the log. -//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { -//return nil -//} +// Truncate deletes the values prior to mint and the series entries not in p. +func (w *SegmentWAL) Truncate(mint int64, p Postings) error { + // TODO(gouthamve): Handle the deletes too. + delFiles := make([]*segmentFile, 0) + + // All files have to be traversed as there could be two segments for a block + // with first block having times (10000, 20000) and SECOND one having (0, 10000). + for _, f := range w.files { + if f.maxt < mint { + delFiles = append(delFiles, f) + } + } + + if len(delFiles) == 0 { + return nil + } + + tempWAL := &SegmentWAL{ + logger: w.logger, + files: delFiles, + } + wr := newWALReader(tempWAL, 0, tempWAL.logger) + + // Create a new tmp file. + // TODO: Do it properly. + newF, err := os.Create(delFiles[0].f.Name() + ".tmp") + if err != nil { + return errors.Wrap(err, "create tmp series dump file") + } + // Write header metadata for new file. + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], WALMagic) + metab[4] = WALFormatDefault + if _, err := newF.Write(metab); err != nil { + return err + } + +WRLoop: + for wr.next() { + rt, flag, byt := wr.at() + + if rt != WALEntrySeries { + continue + } + + series, err := wr.decodeSeries(flag, byt) + if err != nil { + return errors.Wrap(err, "decode samples while truncating") + } + + activeSeries := make([]RefSeries, 0, len(series)) + for _, s := range series { + if !p.Seek(uint32(s.Ref)) { + break WRLoop + } + + if p.At() == uint32(s.Ref) { + activeSeries = append(activeSeries, s) + } + } + + if len(activeSeries) == 0 { + continue + } + buf := getWALBuffer() + buf = encodeSeries(buf, activeSeries) + _, err = newF.Write(buf) + if err != nil { + return errors.Wrap(err, "write series") + } + } + + if err := newF.Close(); err != nil { + return errors.Wrap(err, "close tmp file") + } + + if err := renameFile(newF.Name(), w.files[0].f.Name()); err != nil { + return err + } + delFiles = delFiles[1:] + for _, f := range delFiles { + if err := os.RemoveAll(f.f.Name()); err != nil { + return errors.Wrap(err, "delete WAL segment file") + } + } + // TODO: sync parent directory. + return nil +} // LogSeries writes a batch of new series labels to the log. -func (w *SegmentWAL) LogSeries(series []labels.Labels) error { +// The series have to be ordered. +func (w *SegmentWAL) LogSeries(series []RefSeries) error { if err := w.encodeSeries(series); err != nil { return err } @@ -225,13 +329,14 @@ func (w *SegmentWAL) initSegments() error { if err != nil { return err } - w.files = append(w.files, f) + w.files = append(w.files, &segmentFile{f: f}) } // Consume and validate meta headers. - for _, f := range w.files { + for _, sf := range w.files { metab := make([]byte, 8) + f := sf.f if n, err := f.Read(metab); err != nil { return errors.Wrapf(err, "validate meta %q", f.Name()) } else if n != 8 { @@ -293,7 +398,9 @@ func (w *SegmentWAL) cut() error { return err } - w.files = append(w.files, f) + w.files = append(w.files, &segmentFile{f: f}) + + // TODO(gouthamve): make the buffer size a constant. w.cur = bufio.NewWriterSize(f, 4*1024*1024) w.curN = 8 @@ -304,7 +411,7 @@ func (w *SegmentWAL) tail() *os.File { if len(w.files) == 0 { return nil } - return w.files[len(w.files)-1] + return w.files[len(w.files)-1].f } // Sync flushes the changes to disk. @@ -330,10 +437,6 @@ func (w *SegmentWAL) Sync() error { return fileutil.Fdatasync(tail) } -func (w *SegmentWAL) Truncate(maxt int64) error { - return nil -} - func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err @@ -396,6 +499,7 @@ const ( // It should be a multiple of the minimum sector size so that WAL can safely // distinguish between torn writes and ordinary data corruption. walPageBytes = 16 * minSectorSize + // TODO(gouthamve): What is this? ) func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { @@ -404,6 +508,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. + // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. var ( // 6-byte header + 4-byte CRC32 + buf. sz = int64(6 + 4 + len(buf)) @@ -439,6 +544,14 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { w.curN += sz putWALBuffer(buf) + + // set the file's maxt. + if len(w.files) > 0 { + cf := w.files[len(w.files)-1] + if cf.maxt < w.maxt { + cf.maxt = w.maxt + } + } return nil } @@ -463,16 +576,31 @@ func putWALBuffer(b []byte) { walBuffers.Put(b) } -func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { +func (w *SegmentWAL) encodeSeries(series []RefSeries) error { if len(series) == 0 { return nil } - b := make([]byte, binary.MaxVarintLen32) buf := getWALBuffer() + buf = encodeSeries(buf, series) + + return w.entry(WALEntrySeries, walSeriesSimple, buf) +} + +func encodeSeries(buf []byte, series []RefSeries) []byte { + b := make([]byte, binary.MaxVarintLen64) + // Store the base reference number of the first series. + // All series encode their ref as a delta to the first. + first := series[0] + binary.BigEndian.PutUint64(b, first.Ref) + buf = append(buf, b[:8]...) - for _, lset := range series { - n := binary.PutUvarint(b, uint64(len(lset))) + for _, s := range series { + n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) + buf = append(buf, b[:n]...) + + lset := s.Labels + n = binary.PutUvarint(b, uint64(len(lset))) buf = append(buf, b[:n]...) for _, l := range lset { @@ -486,7 +614,7 @@ func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { } } - return w.entry(WALEntrySeries, walSeriesSimple, buf) + return buf } func (w *SegmentWAL) encodeSamples(samples []RefSample) error { @@ -508,7 +636,12 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error { binary.BigEndian.PutUint64(b, uint64(first.T)) buf = append(buf, b[:8]...) + w.maxt = 0 for _, s := range samples { + if w.maxt < s.T { + w.maxt = s.T + } + n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) buf = append(buf, b[:n]...) @@ -543,6 +676,7 @@ func (w *SegmentWAL) encodeDeletes(stones []Stone) error { type walReader struct { logger log.Logger + mint int64 wal *SegmentWAL cur int buf []byte @@ -555,12 +689,13 @@ type walReader struct { err error } -func newWALReader(w *SegmentWAL, l log.Logger) *walReader { +func newWALReader(w *SegmentWAL, mint int64, l log.Logger) *walReader { if l == nil { l = log.NewNopLogger() } return &walReader{ logger: l, + mint: mint, wal: w, buf: make([]byte, 0, 128*4096), crc32: newCRC32(), @@ -589,7 +724,24 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return err } - samplesf(s) + + // Update the times for the wal segment file and select only valid samples. + cf := r.wal.files[r.cur] + validSamples := make([]RefSample, 0, len(s)) + + for _, smpl := range s { + if smpl.T < r.mint { + continue + } + + if cf.maxt < smpl.T { + cf.maxt = smpl.T + } + + validSamples = append(validSamples, smpl) + } + + samplesf(validSamples) case WALEntryDeletes: s, err := r.decodeDeletes(flag, b) if err != nil { @@ -607,7 +759,7 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } - cf := r.wal.files[r.cur] + cf := r.wal.files[r.cur].f et, flag, b, err := r.entry(cf) // If we reached the end of the reader, advance to the next one @@ -634,7 +786,7 @@ func (r *walReader) next() bool { if r.cur >= len(r.wal.files) { return false } - cf := r.wal.files[r.cur] + cf := r.wal.files[r.cur].f // Save position after last valid entry if we have to truncate the WAL. lastOffset, err := cf.Seek(0, os.SEEK_CUR) @@ -675,7 +827,7 @@ func (r *walReader) next() bool { } func (r *walReader) current() *os.File { - return r.wal.files[r.cur] + return r.wal.files[r.cur].f } // truncate the WAL after the last valid entry. @@ -684,7 +836,8 @@ func (r *walReader) truncate(lastOffset int64) error { "err", r.err, "file", r.current().Name(), "pos", lastOffset) // Close and delete all files after the current one. - for _, f := range r.wal.files[r.cur+1:] { + for _, sf := range r.wal.files[r.cur+1:] { + f := sf.f if err := f.Close(); err != nil { return err } @@ -754,9 +907,25 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) { - series := []labels.Labels{} +func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { + series := []RefSeries{} + if len(b) < 8 { + return nil, errors.Wrap(errInvalidSize, "header length") + } + + baseRef := binary.BigEndian.Uint64(b) + b = b[8:] + for len(b) > 0 { + var ser RefSeries + // TODO: Check again. + dref, n := binary.Varint(b) + if n < 1 { + return nil, errors.Wrap(errInvalidSize, "series ref delta") + } + b = b[n:] + ser.Ref = uint64(int64(baseRef) + dref) + l, n := binary.Uvarint(b) if n < 1 { return nil, errors.Wrap(errInvalidSize, "number of labels") @@ -779,8 +948,9 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) { lset[i].Value = string(b[n : n+int(vl)]) b = b[n+int(vl):] } + ser.Labels = lset - series = append(series, lset) + series = append(series, ser) } return series, nil } diff --git a/wal_test.go b/wal_test.go index 4f1d3f046..dd2f4cf55 100644 --- a/wal_test.go +++ b/wal_test.go @@ -23,7 +23,6 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" - "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -57,7 +56,7 @@ func TestSegmentWAL_initSegments(t *testing.T) { // Validate that files are locked properly. for _, of := range w.files { - f, err := os.Open(of.Name()) + f, err := os.Open(of.f.Name()) require.NoError(t, err, "open locked segment %s", f.Name()) _, err = f.Read([]byte{0}) @@ -74,7 +73,7 @@ func TestSegmentWAL_initSegments(t *testing.T) { } // Make initialization fail by corrupting the header of one file. - f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[3].f.Name(), os.O_WRONLY, 0666) require.NoError(t, err) _, err = f.WriteAt([]byte{0}, 4) @@ -110,7 +109,7 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.Close()) for _, of := range w.files { - f, err := os.Open(of.Name()) + f, err := os.Open(of.f.Name()) require.NoError(t, err) // Verify header data. @@ -122,7 +121,7 @@ func TestSegmentWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := newWALReader(nil, nil).entry(f) + et, flag, b, err := newWALReader(nil, 0, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -147,7 +146,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { defer os.RemoveAll(dir) var ( - recordedSeries [][]labels.Labels + recordedSeries [][]RefSeries recordedSamples [][]RefSample recordedDeletes [][]Stone ) @@ -165,16 +164,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { r := w.Reader(0) var ( - resultSeries [][]labels.Labels + resultSeries [][]RefSeries resultSamples [][]RefSample resultDeletes [][]Stone ) - serf := func(lsets []labels.Labels) error { - if len(lsets) > 0 { - clsets := make([]labels.Labels, len(lsets)) - copy(clsets, lsets) - resultSeries = append(resultSeries, clsets) + serf := func(series []RefSeries) error { + if len(series) > 0 { + clsets := make([]RefSeries, len(series)) + copy(clsets, series) + resultSeries = append(resultSeries, series) } return nil @@ -224,13 +223,20 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { } lbls := series[i : i+stepSize] + series := make([]RefSeries, 0, len(series)) + for j, l := range lbls { + series = append(series, RefSeries{ + Ref: uint64(i + j), + Labels: l, + }) + } - require.NoError(t, w.LogSeries(lbls)) + require.NoError(t, w.LogSeries(series)) require.NoError(t, w.LogSamples(samples)) require.NoError(t, w.LogDeletes(stones)) if len(lbls) > 0 { - recordedSeries = append(recordedSeries, lbls) + recordedSeries = append(recordedSeries, series) } if len(samples) > 0 { recordedSamples = append(recordedSamples, samples) @@ -254,7 +260,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "truncate_checksum", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -267,7 +273,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "truncate_body", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -280,7 +286,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "body_content", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -295,7 +301,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "checksum", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -341,7 +347,8 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) r := w2.Reader(0) - serf := func(l []labels.Labels) error { + + serf := func(l []RefSeries) error { require.Equal(t, 0, len(l)) return nil }