diff --git a/encoding_helpers.go b/encoding_helpers.go index 25ff32d00..9aa4ba409 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -86,7 +86,7 @@ func (d *decbuf) uvarintStr() string { d.e = errInvalidSize return "" } - s := yoloString(d.b[:l]) + s := string(d.b[:l]) d.b = d.b[l:] return s } diff --git a/head.go b/head.go index 1759a7d4c..807690605 100644 --- a/head.go +++ b/head.go @@ -187,7 +187,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( } func (h *Head) readWAL() error { - r := h.wal.Reader(h.MinTime()) + r := h.wal.Reader() seriesFunc := func(series []RefSeries) error { for _, s := range series { diff --git a/wal.go b/wal.go index 51f52e876..073be2d7d 100644 --- a/wal.go +++ b/wal.go @@ -16,11 +16,14 @@ package tsdb import ( "bufio" "encoding/binary" + "fmt" "hash" "hash/crc32" "io" "math" "os" + "path/filepath" + "sort" "sync" "time" @@ -58,33 +61,10 @@ type SeriesCB func([]RefSeries) error // DeletesCB is the callback after reading deletes. type DeletesCB func([]Stone) error -// SegmentWAL is a write ahead log for series data. -type SegmentWAL struct { - mtx sync.Mutex - - dirFile *os.File - files []*segmentFile - - logger log.Logger - flushInterval time.Duration - segmentSize int64 - - crc32 hash.Hash32 - cur *bufio.Writer - curN int64 - - // The max time of samples committed last/being committed. Not global or current - // segment values. - maxt int64 - - stopc chan struct{} - donec chan struct{} -} - // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. type WAL interface { - Reader(mint int64) WALReader + Reader() WALReader LogSeries([]RefSeries) error LogSamples([]RefSample) error LogDeletes([]Stone) error @@ -92,10 +72,11 @@ type WAL interface { Close() error } +// NopWAL is a WAL that does nothing. type NopWAL struct{} func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w NopWAL) Reader(int64) WALReader { return w } +func (w NopWAL) Reader() WALReader { return w } func (NopWAL) LogSeries([]RefSeries) error { return nil } func (NopWAL) LogSamples([]RefSample) error { return nil } func (NopWAL) LogDeletes([]Stone) error { return nil } @@ -125,13 +106,21 @@ type RefSample struct { series *memSeries } +// segmentFile wraps a file object of a segment and tracks the highest timestamp +// it contains. During WAL truncating, all segments with no higher timestamp than +// the truncation threshold can be compacted. type segmentFile struct { - f *os.File - maxt int64 + *os.File + maxTime int64 // highest tombstone or sample timpstamp in segment + minSeries uint64 // lowerst series ID in segment } -func (f segmentFile) Close() error { - return f.f.Close() +func newSegmentFile(f *os.File) *segmentFile { + return &segmentFile{ + File: f, + maxTime: math.MinInt64, + minSeries: math.MaxUint64, + } } const ( @@ -153,6 +142,26 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } +// SegmentWAL is a write ahead log for series data. +type SegmentWAL struct { + mtx sync.Mutex + + dirFile *os.File + files []*segmentFile + + logger log.Logger + flushInterval time.Duration + segmentSize int64 + + crc32 hash.Hash32 + cur *bufio.Writer + curN int64 + + stopc chan struct{} + donec chan struct{} + buffers sync.Pool +} + // OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { @@ -176,205 +185,328 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) segmentSize: walSegmentSizeBytes, crc32: newCRC32(), } - if err := w.initSegments(); err != nil { + + fns, err := sequenceFiles(w.dirFile.Name()) + if err != nil { return nil, err } + for _, fn := range fns { + f, err := w.openSegmentFile(fn) + if err != nil { + return nil, err + } + w.files = append(w.files, newSegmentFile(f)) + } go w.run(flushInterval) return w, nil } +// repairingWALReader wraps a WAL reader and truncates its underlying SegmentWAL after the last +// valid entry if it encounters corruption. +type repairingWALReader struct { + wal *SegmentWAL + r WALReader +} + +func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { + err := r.r.Read(series, samples, deletes) + if err == nil { + return nil + } + cerr, ok := err.(walCorruptionErr) + if !ok { + return err + } + return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset) +} + +// truncate the WAL after the last valid entry. +func (w *SegmentWAL) truncate(err error, file int, lastOffset int64) error { + w.logger.Log("msg", "WAL corruption detected; truncating", + "err", err, "file", w.files[file].Name(), "pos", lastOffset) + + // Close and delete all files after the current one. + for _, f := range w.files[file+1:] { + if err := f.Close(); err != nil { + return err + } + if err := os.Remove(f.Name()); err != nil { + return err + } + } + w.mtx.Lock() + defer w.mtx.Unlock() + + w.files = w.files[:file+1] + + // Seek the current file to the last valid offset where we continue writing from. + _, err = w.files[file].Seek(lastOffset, os.SEEK_SET) + return err +} + // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *SegmentWAL) Reader(mint int64) WALReader { - return newWALReader(w, mint, w.logger) +func (w *SegmentWAL) Reader() WALReader { + return &repairingWALReader{ + wal: w, + r: newWALReader(w.files, w.logger), + } +} + +func (w *SegmentWAL) getBuffer() *encbuf { + b := w.buffers.Get() + if b == nil { + return &encbuf{b: make([]byte, 0, 64*1024)} + } + return b.(*encbuf) +} + +func (w *SegmentWAL) putBuffer(b *encbuf) { + b.reset() + w.buffers.Put(b) } // Truncate deletes the values prior to mint and the series entries not in p. func (w *SegmentWAL) Truncate(mint int64, p Postings) error { - // TODO(gouthamve): Handle the deletes too. - delFiles := make([]*segmentFile, 0) + // The last segment is always active. + if len(w.files) < 2 { + return nil + } + var candidates []*segmentFile // All files have to be traversed as there could be two segments for a block // with first block having times (10000, 20000) and SECOND one having (0, 10000). - for _, f := range w.files { - if f.maxt < mint { - delFiles = append(delFiles, f) + for _, sf := range w.files[:len(w.files)-1] { + if sf.maxTime >= mint { + break } + // Past WAL files are closed. We have to reopen them for another read. + f, err := w.openSegmentFile(sf.Name()) + if err != nil { + return errors.Wrap(err, "open old WAL segment for read") + } + candidates = append(candidates, &segmentFile{ + File: f, + minSeries: sf.minSeries, + maxTime: sf.maxTime, + }) } - - if len(delFiles) == 0 { + if len(candidates) == 0 { return nil } - tempWAL := &SegmentWAL{ - logger: w.logger, - files: delFiles, - } - wr := newWALReader(tempWAL, 0, tempWAL.logger) + r := newWALReader(candidates, w.logger) // Create a new tmp file. - // TODO: Do it properly. - newF, err := os.Create(delFiles[0].f.Name() + ".tmp") + f, err := w.createSegmentFile(filepath.Join(w.dirFile.Name(), "compact.tmp")) if err != nil { - return errors.Wrap(err, "create tmp series dump file") - } - // Write header metadata for new file. - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], WALMagic) - metab[4] = WALFormatDefault - if _, err := newF.Write(metab); err != nil { - return err + return errors.Wrap(err, "create compaction segment") } + csf := newSegmentFile(f) -WRLoop: - for wr.next() { - rt, flag, byt := wr.at() + activeSeries := []RefSeries{} + +Loop: + for r.next() { + rt, flag, byt := r.at() if rt != WALEntrySeries { continue } - - series, err := wr.decodeSeries(flag, byt) + series, err := r.decodeSeries(flag, byt) if err != nil { return errors.Wrap(err, "decode samples while truncating") } + activeSeries = activeSeries[:0] - activeSeries := make([]RefSeries, 0, len(series)) for _, s := range series { if !p.Seek(s.Ref) { - break WRLoop + break Loop } - if p.At() == s.Ref { activeSeries = append(activeSeries, s) } } - if len(activeSeries) == 0 { - continue - } - buf := getWALBuffer() - buf = encodeSeries(buf, activeSeries) - _, err = newF.Write(buf) + buf := w.getBuffer() + flag = w.encodeSeries(buf, activeSeries) + + _, err = w.writeTo(csf, WALEntrySeries, flag, buf.get()) + w.putBuffer(buf) + if err != nil { - return errors.Wrap(err, "write series") + return err } } + if r.Err() != nil { + return errors.Wrap(r.Err(), "read candidate WAL files") + } - if err := newF.Close(); err != nil { + if err := csf.Close(); err != nil { return errors.Wrap(err, "close tmp file") } - if err := renameFile(newF.Name(), w.files[0].f.Name()); err != nil { + if err := renameFile(csf.Name(), candidates[len(candidates)-1].Name()); err != nil { + return err + } + if err := w.dirFile.Sync(); err != nil { return err } - delFiles = delFiles[1:] - for _, f := range delFiles { - if err := os.RemoveAll(f.f.Name()); err != nil { + + for _, f := range candidates[1:] { + if err := os.RemoveAll(f.Name()); err != nil { return errors.Wrap(err, "delete WAL segment file") } } - // TODO: sync parent directory. return nil } // LogSeries writes a batch of new series labels to the log. // The series have to be ordered. func (w *SegmentWAL) LogSeries(series []RefSeries) error { - if err := w.encodeSeries(series); err != nil { - return err + buf := w.getBuffer() + + flag := w.encodeSeries(buf, series) + err := w.write(WALEntrySeries, flag, buf.get()) + + w.putBuffer(buf) + + if err != nil { + return errors.Wrap(err, "log series") + } + + tf := w.head() + + for _, s := range series { + if tf.minSeries > s.Ref { + tf.minSeries = s.Ref + } } if w.flushInterval <= 0 { - return w.Sync() + return errors.Wrap(w.Sync(), "sync") } return nil } // LogSamples writes a batch of new samples to the log. func (w *SegmentWAL) LogSamples(samples []RefSample) error { - if err := w.encodeSamples(samples); err != nil { - return err + buf := w.getBuffer() + + flag := w.encodeSamples(buf, samples) + err := w.write(WALEntrySamples, flag, buf.get()) + + w.putBuffer(buf) + + if err != nil { + return errors.Wrap(err, "log series") + } + tf := w.head() + + for _, s := range samples { + if tf.maxTime < s.T { + tf.maxTime = s.T + } } if w.flushInterval <= 0 { - return w.Sync() + return errors.Wrap(w.Sync(), "sync") } return nil } // LogDeletes write a batch of new deletes to the log. func (w *SegmentWAL) LogDeletes(stones []Stone) error { - if err := w.encodeDeletes(stones); err != nil { - return err + buf := w.getBuffer() + + flag := w.encodeDeletes(buf, stones) + err := w.write(WALEntryDeletes, flag, buf.get()) + + w.putBuffer(buf) + + if err != nil { + return errors.Wrap(err, "log series") + } + tf := w.head() + + for _, s := range stones { + for _, iv := range s.intervals { + if tf.maxTime < iv.Maxt { + tf.maxTime = iv.Maxt + } + } } if w.flushInterval <= 0 { - return w.Sync() + return errors.Wrap(w.Sync(), "sync") } return nil } -// initSegments finds all existing segment files and opens them in the -// appropriate file modes. -func (w *SegmentWAL) initSegments() error { - fns, err := sequenceFiles(w.dirFile.Name()) - if err != nil { - return err - } - if len(fns) == 0 { - return nil - } +// openSegmentFile opens the given segment file and consumes and validates header. +func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { // We must open all files in read/write mode as we may have to truncate along - // the way and any file may become the tail. - for _, fn := range fns { - f, err := os.OpenFile(fn, os.O_RDWR, 0666) - if err != nil { - return err - } - w.files = append(w.files, &segmentFile{f: f}) + // the way and any file may become the head. + f, err := os.OpenFile(name, os.O_RDWR, 0666) + if err != nil { + return nil, err } + metab := make([]byte, 8) - // Consume and validate meta headers. - for _, sf := range w.files { - metab := make([]byte, 8) + if n, err := f.Read(metab); err != nil { + return nil, errors.Wrapf(err, "validate meta %q", f.Name()) + } else if n != 8 { + return nil, errors.Errorf("invalid header size %d in %q", n, f.Name()) + } - f := sf.f - if n, err := f.Read(metab); err != nil { - return errors.Wrapf(err, "validate meta %q", f.Name()) - } else if n != 8 { - return errors.Errorf("invalid header size %d in %q", n, f.Name()) - } + if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { + return nil, errors.Errorf("invalid magic header %x in %q", m, f.Name()) + } + if metab[4] != WALFormatDefault { + return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) + } + return f, nil +} - if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { - return errors.Errorf("invalid magic header %x in %q", m, f.Name()) - } - if metab[4] != WALFormatDefault { - return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) - } +// createSegmentFile creates a new segment file with the given name. It preallocates +// the standard segment size if possible and writes the header. +func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { + f, err := os.Create(name) + if err != nil { + return nil, err } + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { + return nil, err + } + // Write header metadata for new file. + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], WALMagic) + metab[4] = WALFormatDefault - return nil + if _, err := f.Write(metab); err != nil { + return nil, err + } + return f, err } // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. func (w *SegmentWAL) cut() error { - // Sync current tail to disk and close. - if tf := w.tail(); tf != nil { + // Sync current head to disk and close. + if hf := w.head(); hf != nil { if err := w.sync(); err != nil { return err } - off, err := tf.Seek(0, os.SEEK_CUR) + off, err := hf.Seek(0, os.SEEK_CUR) if err != nil { return err } - if err := tf.Truncate(off); err != nil { + if err := hf.Truncate(off); err != nil { return err } - if err := tf.Close(); err != nil { + if err := hf.Close(); err != nil { return err } } @@ -383,70 +515,60 @@ func (w *SegmentWAL) cut() error { if err != nil { return err } - f, err := os.Create(p) + f, err := w.createSegmentFile(p) if err != nil { return err } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err - } - if err = w.dirFile.Sync(); err != nil { - return err - } - - // Write header metadata for new file. - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], WALMagic) - metab[4] = WALFormatDefault - if _, err := f.Write(metab); err != nil { + if err = w.dirFile.Sync(); err != nil { return err } - w.files = append(w.files, &segmentFile{f: f}) + w.files = append(w.files, newSegmentFile(f)) // TODO(gouthamve): make the buffer size a constant. - w.cur = bufio.NewWriterSize(f, 4*1024*1024) + w.cur = bufio.NewWriterSize(f, 8*1024*1024) w.curN = 8 return nil } -func (w *SegmentWAL) tail() *os.File { +func (w *SegmentWAL) head() *segmentFile { if len(w.files) == 0 { return nil } - return w.files[len(w.files)-1].f + return w.files[len(w.files)-1] } // Sync flushes the changes to disk. func (w *SegmentWAL) Sync() error { - var tail *os.File + var head *segmentFile var err error - // Flush the writer and retrieve the reference to the tail segment under mutex lock + // Flush the writer and retrieve the reference to the head segment under mutex lock. func() { w.mtx.Lock() defer w.mtx.Unlock() if err = w.flush(); err != nil { return } - tail = w.tail() + head = w.head() }() - if err != nil { - return err + return errors.Wrap(err, "flush buffer") } - - // But only fsync the tail segment after releasing the mutex as it will block on disk I/O - return fileutil.Fdatasync(tail) + if head != nil { + // But only fsync the head segment after releasing the mutex as it will block on disk I/O. + return fileutil.Fdatasync(head.File) + } + return nil } func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err } - return fileutil.Fdatasync(w.tail()) + return fileutil.Fdatasync(w.head().File) } func (w *SegmentWAL) flush() error { @@ -491,8 +613,8 @@ func (w *SegmentWAL) Close() error { } // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. - if tf := w.tail(); tf != nil { - return errors.Wrapf(tf.Close(), "closing WAL tail %s", tf.Name()) + if hf := w.head(); hf != nil { + return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) } return nil } @@ -504,19 +626,16 @@ const ( // It should be a multiple of the minimum sector size so that WAL can safely // distinguish between torn writes and ordinary data corruption. walPageBytes = 16 * minSectorSize - // TODO(gouthamve): What is this? ) -func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { +func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() - // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. var ( - // 6-byte header + 4-byte CRC32 + buf. - sz = int64(6 + 4 + len(buf)) + sz = int64(len(buf)) + 6 newsz = w.curN + sz ) // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened. @@ -526,38 +645,37 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { return err } } + n, err := w.writeTo(w.cur, t, flag, buf) + + w.curN += int64(n) + return err +} + +func (w *SegmentWAL) writeTo(wr io.Writer, t WALEntryType, flag uint8, buf []byte) (int, error) { + if len(buf) == 0 { + return 0, nil + } w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.cur) + wr = io.MultiWriter(w.crc32, wr) - b := make([]byte, 6) - b[0] = byte(et) + var b [6]byte + b[0] = byte(t) b[1] = flag binary.BigEndian.PutUint32(b[2:], uint32(len(buf))) - if _, err := wr.Write(b); err != nil { - return err - } - if _, err := wr.Write(buf); err != nil { - return err + n1, err := wr.Write(b[:]) + if err != nil { + return n1, err } - if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil { - return err + n2, err := wr.Write(buf) + if err != nil { + return n1 + n2, err } + n3, err := wr.Write(w.crc32.Sum(b[:0])) - w.curN += sz - - putWALBuffer(buf) - - // set the file's maxt. - if len(w.files) > 0 { - cf := w.files[len(w.files)-1] - if cf.maxt < w.maxt { - cf.maxt = w.maxt - } - } - return nil + return n1 + n2 + n3, err } const ( @@ -566,142 +684,75 @@ const ( walDeletesSimple = 1 ) -var walBuffers = sync.Pool{} - -func getWALBuffer() []byte { - b := walBuffers.Get() - if b == nil { - return make([]byte, 0, 64*1024) - } - return b.([]byte) -} - -func putWALBuffer(b []byte) { - b = b[:0] - walBuffers.Put(b) -} - -func (w *SegmentWAL) encodeSeries(series []RefSeries) error { - if len(series) == 0 { - return nil - } - - buf := getWALBuffer() - buf = encodeSeries(buf, series) - - return w.entry(WALEntrySeries, walSeriesSimple, buf) -} - -func encodeSeries(buf []byte, series []RefSeries) []byte { - b := make([]byte, binary.MaxVarintLen64) - // Store the base reference number of the first series. - // All series encode their ref as a delta to the first. - first := series[0] - binary.BigEndian.PutUint64(b, first.Ref) - buf = append(buf, b[:8]...) - +func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 { for _, s := range series { - binary.BigEndian.PutUint64(b, s.Ref) - buf = append(buf, b[:8]...) - - lset := s.Labels - n := binary.PutUvarint(b, uint64(len(lset))) - buf = append(buf, b[:n]...) - - for _, l := range lset { - n = binary.PutUvarint(b, uint64(len(l.Name))) - buf = append(buf, b[:n]...) - buf = append(buf, l.Name...) + buf.putBE64(s.Ref) + buf.putUvarint(len(s.Labels)) - n = binary.PutUvarint(b, uint64(len(l.Value))) - buf = append(buf, b[:n]...) - buf = append(buf, l.Value...) + for _, l := range s.Labels { + buf.putUvarintStr(l.Name) + buf.putUvarintStr(l.Value) } } - - return buf + return walSeriesSimple } -func (w *SegmentWAL) encodeSamples(samples []RefSample) error { +func (w *SegmentWAL) encodeSamples(buf *encbuf, samples []RefSample) uint8 { if len(samples) == 0 { - return nil + return walSamplesSimple } - - b := make([]byte, binary.MaxVarintLen64) - buf := getWALBuffer() - // Store base timestamp and base reference number of first sample. // All samples encode their timestamp and ref as delta to those. // // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - binary.BigEndian.PutUint64(b, first.Ref) - buf = append(buf, b[:8]...) - binary.BigEndian.PutUint64(b, uint64(first.T)) - buf = append(buf, b[:8]...) + buf.putBE64(first.Ref) + buf.putBE64int64(first.T) - w.maxt = 0 for _, s := range samples { - if w.maxt < s.T { - w.maxt = s.T - } - - n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) - buf = append(buf, b[:n]...) - - n = binary.PutVarint(b, s.T-first.T) - buf = append(buf, b[:n]...) - - binary.BigEndian.PutUint64(b, math.Float64bits(s.V)) - buf = append(buf, b[:8]...) + buf.putVarint64(int64(s.Ref) - int64(first.Ref)) + buf.putVarint64(s.T - first.T) + buf.putBE64(math.Float64bits(s.V)) } - - return w.entry(WALEntrySamples, walSamplesSimple, buf) + return walSamplesSimple } -func (w *SegmentWAL) encodeDeletes(stones []Stone) error { - b := make([]byte, 2*binary.MaxVarintLen64) - eb := &encbuf{b: b} - buf := getWALBuffer() +func (w *SegmentWAL) encodeDeletes(buf *encbuf, stones []Stone) uint8 { for _, s := range stones { - for _, itv := range s.intervals { - eb.reset() - eb.putUvarint64(s.ref) - eb.putVarint64(itv.Mint) - eb.putVarint64(itv.Maxt) - buf = append(buf, eb.get()...) + for _, iv := range s.intervals { + buf.putBE64(s.ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) } } - - return w.entry(WALEntryDeletes, walDeletesSimple, buf) + return walDeletesSimple } // walReader decodes and emits write ahead log entries. type walReader struct { logger log.Logger - mint int64 - wal *SegmentWAL + files []*segmentFile cur int buf []byte crc32 hash.Hash32 - curType WALEntryType - curFlag byte - curBuf []byte + curType WALEntryType + curFlag byte + curBuf []byte + lastOffset int64 // offset after last successfully read entry err error } -func newWALReader(w *SegmentWAL, mint int64, l log.Logger) *walReader { +func newWALReader(files []*segmentFile, l log.Logger) *walReader { if l == nil { l = log.NewNopLogger() } return &walReader{ logger: l, - mint: mint, - wal: w, + files: files, buf: make([]byte, 0, 128*4096), crc32: newCRC32(), } @@ -713,46 +764,69 @@ func (r *walReader) Err() error { } func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { + if seriesf == nil { + seriesf = func([]RefSeries) error { return nil } + } + if samplesf == nil { + samplesf = func([]RefSample) error { return nil } + } + if deletesf == nil { + deletesf = func([]Stone) error { return nil } + } + for r.next() { et, flag, b := r.at() // In decoding below we never return a walCorruptionErr for now. // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - s, err := r.decodeSeries(flag, b) + series, err := r.decodeSeries(flag, b) if err != nil { - return err + return errors.Wrap(err, "decode series entry") } - seriesf(s) + seriesf(series) + + cf := r.current() + + for _, s := range series { + if cf.minSeries > s.Ref { + cf.minSeries = s.Ref + } + } + case WALEntrySamples: - s, err := r.decodeSamples(flag, b) + samples, err := r.decodeSamples(flag, b) if err != nil { - return err + return errors.Wrap(err, "decode samples entry") } + samplesf(samples) - // Update the times for the wal segment file and select only valid samples. - cf := r.wal.files[r.cur] - validSamples := make([]RefSample, 0, len(s)) + // Update the times for the WAL segment file. + cf := r.current() - for _, smpl := range s { - if smpl.T < r.mint { - continue + for _, s := range samples { + if cf.maxTime < s.T { + cf.maxTime = s.T } - - if cf.maxt < smpl.T { - cf.maxt = smpl.T - } - - validSamples = append(validSamples, smpl) } - samplesf(validSamples) case WALEntryDeletes: - s, err := r.decodeDeletes(flag, b) + stones, err := r.decodeDeletes(flag, b) if err != nil { - return err + return errors.Wrap(err, "decode delete entry") + } + deletesf(stones) + // Update the times for the WAL segment file. + + cf := r.current() + + for _, s := range stones { + for _, iv := range s.intervals { + if cf.maxTime < iv.Maxt { + cf.maxTime = iv.Maxt + } + } } - deletesf(s) } } @@ -761,20 +835,17 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.wal.files) { + if r.cur >= len(r.files) { return 0, 0, nil, io.EOF } - cf := r.wal.files[r.cur].f + cf := r.current() et, flag, b, err := r.entry(cf) - // If we reached the end of the reader, advance to the next one - // and close. + // If we reached the end of the reader, advance to the next one and close. // Do not close on the last one as it will still be appended to. - if err == io.EOF && r.cur < len(r.wal.files)-1 { - // Current reader completed, close and move to the next one. - if err := cf.Close(); err != nil { - return 0, 0, nil, err - } + if err == io.EOF && r.cur < len(r.files)-1 { + // Current reader completed. Leave the file open for later reads + // for truncating. r.cur++ return r.nextEntry() } @@ -788,15 +859,15 @@ func (r *walReader) at() (WALEntryType, byte, []byte) { // next returns decodes the next entry pair and returns true // if it was succesful. func (r *walReader) next() bool { - if r.cur >= len(r.wal.files) { + if r.cur >= len(r.files) { return false } - cf := r.wal.files[r.cur].f + cf := r.files[r.cur] - // Save position after last valid entry if we have to truncate the WAL. - lastOffset, err := cf.Seek(0, os.SEEK_CUR) - if err != nil { - r.err = err + // Remember the offset after the last correctly read entry. If the next one + // is corrupted, this is where we can safely truncate. + r.lastOffset, r.err = cf.Seek(0, os.SEEK_CUR) + if r.err != nil { return false } @@ -805,7 +876,7 @@ func (r *walReader) next() bool { // and close. // Do not close on the last one as it will still be appended to. if err == io.EOF { - if r.cur == len(r.wal.files)-1 { + if r.cur == len(r.files)-1 { return false } // Current reader completed, close and move to the next one. @@ -818,10 +889,6 @@ func (r *walReader) next() bool { } if err != nil { r.err = err - - if _, ok := err.(walCorruptionErr); ok { - r.err = r.truncate(lastOffset) - } return false } @@ -831,38 +898,28 @@ func (r *walReader) next() bool { return r.err == nil } -func (r *walReader) current() *os.File { - return r.wal.files[r.cur].f -} - -// truncate the WAL after the last valid entry. -func (r *walReader) truncate(lastOffset int64) error { - r.logger.Log("msg", "WAL corruption detected; truncating", - "err", r.err, "file", r.current().Name(), "pos", lastOffset) - - // Close and delete all files after the current one. - for _, sf := range r.wal.files[r.cur+1:] { - f := sf.f - if err := f.Close(); err != nil { - return err - } - if err := os.Remove(f.Name()); err != nil { - return err - } - } - r.wal.files = r.wal.files[:r.cur+1] - - // Seek the current file to the last valid offset where we continue writing from. - _, err := r.current().Seek(lastOffset, os.SEEK_SET) - return err +func (r *walReader) current() *segmentFile { + return r.files[r.cur] } // walCorruptionErr is a type wrapper for errors that indicate WAL corruption // and trigger a truncation. -type walCorruptionErr error +type walCorruptionErr struct { + err error + file int + lastOffset int64 +} -func walCorruptionErrf(s string, args ...interface{}) error { - return walCorruptionErr(errors.Errorf(s, args...)) +func (e walCorruptionErr) Error() string { + return fmt.Sprintf("%s ", e.err, e.file, e.lastOffset) +} + +func (r *walReader) corruptionErr(s string, args ...interface{}) error { + return walCorruptionErr{ + err: errors.Errorf(s, args...), + file: r.cur, + lastOffset: r.lastOffset, + } } func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { @@ -873,7 +930,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if n, err := tr.Read(b); err != nil { return 0, 0, nil, err } else if n != 6 { - return 0, 0, nil, walCorruptionErrf("invalid entry header size %d", n) + return 0, 0, nil, r.corruptionErr("invalid entry header size %d", n) } var ( @@ -886,7 +943,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return 0, 0, nil, io.EOF } if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes { - return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) + return 0, 0, nil, r.corruptionErr("invalid entry type %d", etype) } if length > len(r.buf) { @@ -897,16 +954,16 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if n, err := tr.Read(buf); err != nil { return 0, 0, nil, err } else if n != length { - return 0, 0, nil, walCorruptionErrf("invalid entry body size %d", n) + return 0, 0, nil, r.corruptionErr("invalid entry body size %d", n) } if n, err := cr.Read(b[:4]); err != nil { return 0, 0, nil, err } else if n != 4 { - return 0, 0, nil, walCorruptionErrf("invalid checksum length %d", n) + return 0, 0, nil, r.corruptionErr("invalid checksum length %d", n) } if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { - return 0, 0, nil, walCorruptionErrf("unexpected CRC32 checksum %x, want %x", has, exp) + return 0, 0, nil, r.corruptionErr("unexpected CRC32 checksum %x, want %x", has, exp) } return etype, flag, buf, nil @@ -914,105 +971,83 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { series := []RefSeries{} - if len(b) < 8 { - return nil, errors.Wrap(errInvalidSize, "header length") - } + dec := decbuf{b: b} - b = b[8:] - - for len(b) > 0 { - var ser RefSeries - // TODO: Check again. - if len(b) < 8 { - return nil, errors.Wrap(errInvalidSize, "series ref") - } - ser.Ref = binary.BigEndian.Uint64(b) - b = b[8:] - - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "number of labels") - } - b = b[n:] - lset := make(labels.Labels, l) + for len(dec.b) > 0 && dec.err() == nil { + ref := dec.be64() - for i := 0; i < int(l); i++ { - nl, n := binary.Uvarint(b) - if n < 1 || len(b) < n+int(nl) { - return nil, errors.Wrap(errInvalidSize, "label name") - } - lset[i].Name = string(b[n : n+int(nl)]) - b = b[n+int(nl):] + lset := make(labels.Labels, dec.uvarint()) - vl, n := binary.Uvarint(b) - if n < 1 || len(b) < n+int(vl) { - return nil, errors.Wrap(errInvalidSize, "label value") - } - lset[i].Value = string(b[n : n+int(vl)]) - b = b[n+int(vl):] + for i := range lset { + lset[i].Name = dec.uvarintStr() + lset[i].Value = dec.uvarintStr() } - ser.Labels = lset + sort.Sort(lset) - series = append(series, ser) + series = append(series, RefSeries{ + Ref: ref, + Labels: lset, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } return series, nil } func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { + if len(b) == 0 { + return nil, nil + } samples := []RefSample{} + dec := decbuf{b: b} - if len(b) < 16 { - return nil, errors.Wrap(errInvalidSize, "header length") - } var ( - baseRef = binary.BigEndian.Uint64(b) - baseTime = int64(binary.BigEndian.Uint64(b[8:])) + baseRef = dec.be64() + baseTime = dec.be64int64() ) - b = b[16:] - - for len(b) > 0 { - var smpl RefSample - - dref, n := binary.Varint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "sample ref delta") - } - b = b[n:] - smpl.Ref = uint64(int64(baseRef) + dref) + for len(dec.b) > 0 && dec.err() == nil { + dref := dec.varint64() + dtime := dec.varint64() + val := dec.be64() - dtime, n := binary.Varint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "sample timestamp delta") - } - b = b[n:] - smpl.T = baseTime + dtime - - if len(b) < 8 { - return nil, errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) - } - smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) - b = b[8:] + samples = append(samples, RefSample{ + Ref: uint64(int64(baseRef) + dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + }) + } - samples = append(samples, smpl) + if dec.err() != nil { + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + } + if len(dec.b) > 0 { + return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } return samples, nil } func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { - db := &decbuf{b: b} - stones := []Stone{} - - for db.len() > 0 { - var s Stone - s.ref = db.uvarint64() - s.intervals = Intervals{{db.varint64(), db.varint64()}} - if db.err() != nil { - return nil, db.err() - } + dec := &decbuf{b: b} + var stones []Stone - stones = append(stones, s) + for dec.len() > 0 && dec.err() == nil { + stones = append(stones, Stone{ + ref: dec.be64(), + intervals: Intervals{ + {Mint: dec.varint64(), Maxt: dec.varint64()}, + }, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return stones, nil } diff --git a/wal_test.go b/wal_test.go index 15b64943e..65ed3bc5f 100644 --- a/wal_test.go +++ b/wal_test.go @@ -21,21 +21,15 @@ import ( "os" "testing" - "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" "github.com/stretchr/testify/require" ) -func TestSegmentWAL_initSegments(t *testing.T) { +func TestSegmentWAL_Open(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_open") require.NoError(t, err) defer os.RemoveAll(tmpdir) - df, err := fileutil.OpenDir(tmpdir) - require.NoError(t, err) - - w := &SegmentWAL{dirFile: df} - // Create segment files with an appropriate header. for i := 1; i <= 5; i++ { metab := make([]byte, 8) @@ -50,13 +44,14 @@ func TestSegmentWAL_initSegments(t *testing.T) { } // Initialize 5 correct segment files. - require.NoError(t, w.initSegments()) + w, err := OpenSegmentWAL(tmpdir, nil, 0) + require.NoError(t, err) require.Equal(t, 5, len(w.files), "unexpected number of segments loaded") // Validate that files are locked properly. for _, of := range w.files { - f, err := os.Open(of.f.Name()) + f, err := os.Open(of.Name()) require.NoError(t, err, "open locked segment %s", f.Name()) _, err = f.Read([]byte{0}) @@ -73,18 +68,14 @@ func TestSegmentWAL_initSegments(t *testing.T) { } // Make initialization fail by corrupting the header of one file. - f, err := os.OpenFile(w.files[3].f.Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666) require.NoError(t, err) _, err = f.WriteAt([]byte{0}, 4) require.NoError(t, err) - w = &SegmentWAL{dirFile: df} - require.Error(t, w.initSegments(), "init corrupted segments") - - for _, f := range w.files { - require.NoError(t, f.Close()) - } + w, err = OpenSegmentWAL(tmpdir, nil, 0) + require.Error(t, err, "open with corrupted segments") } func TestSegmentWAL_cut(t *testing.T) { @@ -96,7 +87,7 @@ func TestSegmentWAL_cut(t *testing.T) { w, err := OpenSegmentWAL(tmpdir, nil, 0) require.NoError(t, err) - require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) + require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) require.NoError(t, w.cut(), "cut failed") @@ -104,12 +95,12 @@ func TestSegmentWAL_cut(t *testing.T) { require.Equal(t, 2, len(w.files)) require.Error(t, w.files[0].Close()) - require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) + require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) require.NoError(t, w.Close()) for _, of := range w.files { - f, err := os.Open(of.f.Name()) + f, err := os.Open(of.Name()) require.NoError(t, err) // Verify header data. @@ -121,7 +112,7 @@ func TestSegmentWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := newWALReader(nil, 0, nil).entry(f) + et, flag, b, err := newWALReader(nil, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -129,12 +120,78 @@ func TestSegmentWAL_cut(t *testing.T) { } } +func TestSegmentWAL_Truncate(t *testing.T) { + const ( + numMetrics = 250 + batch = 50 + ) + series, err := readPrometheusLabels("testdata/20k.series", numMetrics) + require.NoError(t, err) + + dir, err := ioutil.TempDir("", "test_wal_log_truncate") + require.NoError(t, err) + // defer os.RemoveAll(dir) + + w, err := OpenSegmentWAL(dir, nil, 0) + require.NoError(t, err) + w.segmentSize = 1000 + + for i := 0; i < numMetrics; i += batch { + var rs []RefSeries + + for j, s := range series[i : i+batch] { + rs = append(rs, RefSeries{Labels: s, Ref: uint64(i+j) + 1}) + } + err := w.LogSeries(rs) + require.NoError(t, err) + } + + // We mark the 2nd half of the files with a min timestamp that should discard + // them from the selection of compactable files. + for i, f := range w.files[len(w.files)/2:] { + f.maxTime = int64(1000 + i) + } + // All series in those files must be preserved regarding of the provided postings list. + boundarySeries := w.files[len(w.files)/2].minSeries + + // We truncate while keeping every 2nd series. + keep := []uint64{} + for i := 1; i <= numMetrics; i += 2 { + keep = append(keep, uint64(i)) + } + + err = w.Truncate(1000, newListPostings(keep)) + require.NoError(t, err) + require.NoError(t, w.Close()) + + w, err = OpenSegmentWAL(dir, nil, 0) + require.NoError(t, err) + + var readSeries []RefSeries + r := w.Reader() + + r.Read(func(s []RefSeries) error { + readSeries = append(readSeries, s...) + return nil + }, nil, nil) + + var expected []RefSeries + + for i := 1; i <= numMetrics; i++ { + if i%2 == 1 || uint64(i) >= boundarySeries { + expected = append(expected, RefSeries{Ref: uint64(i), Labels: series[i-1]}) + } + } + + require.Equal(t, len(expected), len(readSeries)) +} + // Symmetrical test of reading and writing to the WAL via its main interface. func TestSegmentWAL_Log_Restore(t *testing.T) { const ( - numMetrics = 5000 + numMetrics = 50 iterations = 5 - stepSize = 100 + stepSize = 1 ) // Generate testing data. It does not make semantical sense but // for the purpose of this test. @@ -161,7 +218,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Set smaller segment size so we can actually write several files. w.segmentSize = 1000 * 1000 - r := w.Reader(0) + r := w.Reader() var ( resultSeries [][]RefSeries @@ -260,7 +317,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "truncate_checksum", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -273,7 +330,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "truncate_body", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -286,7 +343,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "body_content", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -301,7 +358,7 @@ func TestWALRestoreCorrupted(t *testing.T) { { name: "checksum", f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].f.Name(), os.O_WRONLY, 0666) + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -346,7 +403,7 @@ func TestWALRestoreCorrupted(t *testing.T) { w2, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) - r := w2.Reader(0) + r := w2.Reader() serf := func(l []RefSeries) error { require.Equal(t, 0, len(l)) @@ -377,7 +434,7 @@ func TestWALRestoreCorrupted(t *testing.T) { w3, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) - r = w3.Reader(0) + r = w3.Reader() i = 0 require.NoError(t, r.Read(serf, samplf, delf))