From 77d5a7d47a5292e86fd9dfc3010ae33f56a858de Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 19 Feb 2019 14:33:57 +0000 Subject: [PATCH] LiveReader can get into an infinite loop on corrupt WALs. (#524) Make WAL live tailer return EOF when the there is a half-written record at the end of the file. Previously, this would cause an infinite loop as we ignored EOFs when filling the buffer. We now differentiate between EOFs that read >0 bytes, and EOFs that didn't. Add some more unit tests for tailing a corrupt WAL, and unify interfaces Reader and LiveReader for the purposes of testing. Signed-off-by: Tom Wilkie --- wal/live_reader.go | 284 +++++++++++++++++++++++++ wal/reader.go | 183 ++++++++++++++++ wal/reader_test.go | 506 +++++++++++++++++++++++++++++++++++++++++++++ wal/wal.go | 387 ---------------------------------- wal/wal_test.go | 378 --------------------------------- 5 files changed, 973 insertions(+), 765 deletions(-) create mode 100644 wal/live_reader.go create mode 100644 wal/reader.go create mode 100644 wal/reader_test.go diff --git a/wal/live_reader.go b/wal/live_reader.go new file mode 100644 index 000000000..8394bfd08 --- /dev/null +++ b/wal/live_reader.go @@ -0,0 +1,284 @@ +// Copyright 2019 The Prometheus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}) +) + +// NewLiveReader returns a new live reader. +func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { + return &LiveReader{ + logger: logger, + rdr: r, + + // Until we understand how they come about, make readers permissive + // to records spanning pages. + permissive: true, + } +} + +// LiveReader reads WAL records from an io.Reader. It allows reading of WALs +// that are still in the process of being written, and returns records as soon +// as they can be read. +type LiveReader struct { + logger log.Logger + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. + + // For testing, we can treat EOF as a non-error. + eofNonErr bool + + // We sometime see records span page boundaries. Should never happen, but it + // does. Until we track down why, set permissive to true to tolerate it. + // NB the non-ive Reader implementation allows for this. + permissive bool +} + +// Err returns any errors encountered reading the WAL. io.EOFs are not terminal +// and Next can be tried again. Non-EOFs are terminal, and the reader should +// not be used again. It is up to the user to decide when to stop trying should +// io.EOF be returned. +func (r *LiveReader) Err() error { + if r.eofNonErr && r.err == io.EOF { + return nil + } + return r.err +} + +// Offset returns the number of bytes consumed from this segment. +func (r *LiveReader) Offset() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() (int, error) { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return n, err +} + +// Next returns true if Record() will contain a full record. +// If Next returns false, you should always checked the contents of Error(). +// Return false guarantees there are no more records if the segment is closed +// and not corrupt, otherwise if Err() == io.EOF you should try again when more +// data has been written. +func (r *LiveReader) Next() bool { + for { + // If buildRecord returns a non-EOF error, its game up - the segment is + // corrupt. If buildRecord returns an EOF, we try and read more in + // fillBuffer later on. If that fails to read anything (n=0 && err=EOF), + // we return EOF and the user can try again later. If we have a full + // page, buildRecord is guaranteed to return a record or a non-EOF; it + // has checks the records fit in pages. + if ok, err := r.buildRecord(); ok { + return true + } else if err != nil && err != io.EOF { + r.err = err + return false + } + + // If we've filled the page and not found a record, this + // means records have started to span pages. Shouldn't happen + // but does and until we found out why, we need to deal with this. + if r.permissive && r.writeIndex == pageSize && r.readIndex > 0 { + copy(r.buf[:], r.buf[r.readIndex:]) + r.writeIndex -= r.readIndex + r.readIndex = 0 + continue + } + + if r.readIndex == pageSize { + r.writeIndex = 0 + r.readIndex = 0 + } + + if r.writeIndex != pageSize { + n, err := r.fillBuffer() + if n == 0 || (err != nil && err != io.EOF) { + r.err = err + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appended to +// LiveReader.rec +func (r *LiveReader) buildRecord() (bool, error) { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false, nil + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := r.readRecord() + if err != nil { + return false, err + } + + r.readIndex += n + r.total += int64(n) + if temp == nil { + return false, nil + } + + rt := recType(r.hdr[0]) + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.index = 0 + return false, err + } + if rt == recLast || rt == recFull { + r.index = 0 + return true, nil + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// This is a non-method function to make it clear it does not mutate the reader. +func (r *LiveReader) readRecord() ([]byte, int, error) { + // Special case: for recPageTerm, check that are all zeros to end of page, + // consume them but don't return them. + if r.buf[r.readIndex] == byte(recPageTerm) { + // End of page won't necessarily be end of buffer, as we may have + // got misaligned by records spanning page boundaries. + // r.total % pageSize is the offset into the current page + // that r.readIndex points to in buf. Therefore + // pageSize - (r.total % pageSize) is the amount left to read of + // the current page. + remaining := int(pageSize - (r.total % pageSize)) + if r.readIndex+remaining > r.writeIndex { + return nil, 0, io.EOF + } + + for i := r.readIndex; i < r.readIndex+remaining; i++ { + if r.buf[i] != 0 { + return nil, 0, errors.New("unexpected non-zero byte in page term bytes") + } + } + + return nil, remaining, nil + } + + // Not a recPageTerm; read the record and check the checksum. + if r.writeIndex-r.readIndex < recordHeaderSize { + return nil, 0, io.EOF + } + + copy(r.hdr[:], r.buf[r.readIndex:r.readIndex+recordHeaderSize]) + length := int(binary.BigEndian.Uint16(r.hdr[1:])) + crc := binary.BigEndian.Uint32(r.hdr[3:]) + if r.readIndex+recordHeaderSize+length > pageSize { + if !r.permissive { + return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) + } + readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) + } + if recordHeaderSize+length > pageSize { + return nil, 0, fmt.Errorf("record length greater than a single page: %d > %d", recordHeaderSize+length, pageSize) + } + if r.readIndex+recordHeaderSize+length > r.writeIndex { + return nil, 0, io.EOF + } + + rec := r.buf[r.readIndex+recordHeaderSize : r.readIndex+recordHeaderSize+length] + if c := crc32.Checksum(rec, castagnoliTable); c != crc { + return nil, 0, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + + return rec, length + recordHeaderSize, nil +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/wal/reader.go b/wal/reader.go new file mode 100644 index 000000000..297463b00 --- /dev/null +++ b/wal/reader.go @@ -0,0 +1,183 @@ +// Copyright 2019 The Prometheus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "encoding/binary" + "hash/crc32" + "io" + + "github.com/pkg/errors" +) + +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int64 // Total bytes processed. + curRecTyp recType // Used for checking that the last record is not torn. +} + +// NewReader returns a new reader. +func NewReader(r io.Reader) *Reader { + return &Reader{rdr: r} +} + +// Next advances the reader to the next records and returns true if it exists. +// It must not be called again after it returned false. +func (r *Reader) Next() bool { + err := r.next() + if errors.Cause(err) == io.EOF { + // The last WAL segment record shouldn't be torn(should be full or last). + // The last record would be torn after a crash just before + // the last record part could be persisted to disk. + if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + r.err = errors.New("last record is torn") + } + return false + } + r.err = err + return r.err == nil +} + +func (r *Reader) next() (err error) { + // We have to use r.buf since allocating byte arrays here fails escape + // analysis and ends up on the heap, even though it seemingly should not. + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] + + r.rec = r.rec[:0] + + i := 0 + for { + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") + } + r.total++ + r.curRecTyp = recType(hdr[0]) + + // Gobble up zero bytes. + if r.curRecTyp == recPageTerm { + // recPageTerm is a single byte that indicates the rest of the page is padded. + // If it's the first byte in a page, buf is too small and + // needs to be resized to fit pageSize-1 bytes. + buf = r.buf[1:] + + // We are pedantic and check whether the zeros are actually up + // to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (r.total % pageSize) + if k == pageSize { + continue // Initial 0 byte was last page byte. + } + n, err := io.ReadFull(r.rdr, buf[:k]) + if err != nil { + return errors.Wrap(err, "read remaining zeros") + } + r.total += int64(n) + + for _, c := range buf[:k] { + if c != 0 { + return errors.New("unexpected non-zero byte in padded page") + } + } + continue + } + n, err := io.ReadFull(r.rdr, hdr[1:]) + if err != nil { + return errors.Wrap(err, "read remaining header") + } + r.total += int64(n) + + var ( + length = binary.BigEndian.Uint16(hdr[1:]) + crc = binary.BigEndian.Uint32(hdr[3:]) + ) + + if length > pageSize-recordHeaderSize { + return errors.Errorf("invalid record size %d", length) + } + n, err = io.ReadFull(r.rdr, buf[:length]) + if err != nil { + return err + } + r.total += int64(n) + + if n != int(length) { + return errors.Errorf("invalid size: expected %d, got %d", length, n) + } + if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { + return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + r.rec = append(r.rec, buf[:length]...) + + if err := validateRecord(r.curRecTyp, i); err != nil { + return err + } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + i++ + } +} + +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. +func (r *Reader) Err() error { + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Dir: b.segs[b.cur].Dir(), + Segment: b.segs[b.cur].Index(), + Offset: int64(b.off), + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } +} + +// Record returns the current record. The returned byte slice is only +// valid until the next call to Next. +func (r *Reader) Record() []byte { + return r.rec +} + +// Segment returns the current segment being read. +func (r *Reader) Segment() int { + if b, ok := r.rdr.(*segmentBufReader); ok { + return b.segs[b.cur].Index() + } + return -1 +} + +// Offset returns the current position of the segment being read. +func (r *Reader) Offset() int64 { + if b, ok := r.rdr.(*segmentBufReader); ok { + return int64(b.off) + } + return r.total +} diff --git a/wal/reader_test.go b/wal/reader_test.go new file mode 100644 index 000000000..d782ae804 --- /dev/null +++ b/wal/reader_test.go @@ -0,0 +1,506 @@ +// Copyright 2019 The Prometheus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "runtime" + "strconv" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/tsdb/testutil" +) + +type reader interface { + Next() bool + Err() error + Record() []byte + Offset() int64 +} + +type record struct { + t recType + b []byte +} + +var readerConstructors = map[string]func(io.Reader) reader{ + "Reader": func(r io.Reader) reader { + return NewReader(r) + }, + "LiveReader": func(r io.Reader) reader { + lr := NewLiveReader(log.NewNopLogger(), r) + lr.eofNonErr = true + return lr + }, +} + +var data = make([]byte, 100000) +var testReaderCases = []struct { + t []record + exp [][]byte + fail bool +}{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Two records the together are too big for a page. + // NB currently the non-live reader succeeds on this. I think this is a bug. + // but we've seen it in production. + { + t: []record{ + {recFull, data[:pageSize/2]}, + {recFull, data[:pageSize/2]}, + }, + exp: [][]byte{ + data[:pageSize/2], + data[:pageSize/2], + }, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, pageSize-recordHeaderSize-102), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, +} + +func encodedRecord(t recType, b []byte) []byte { + if t == recPageTerm { + return append([]byte{0}, b...) + } + r := make([]byte, recordHeaderSize) + r[0] = byte(t) + binary.BigEndian.PutUint16(r[1:], uint16(len(b))) + binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable)) + return append(r, b...) +} + +// TestReader feeds the reader a stream of encoded records with different types. +func TestReader(t *testing.T) { + for name, fn := range readerConstructors { + for i, c := range testReaderCases { + t.Run(fmt.Sprintf("%s/%d", name, i), func(t *testing.T) { + var buf []byte + for _, r := range c.t { + buf = append(buf, encodedRecord(r.t, r.b)...) + } + r := fn(bytes.NewReader(buf)) + + for j := 0; r.Next(); j++ { + t.Logf("record %d", j) + rec := r.Record() + + if j >= len(c.exp) { + t.Fatal("received more records than expected") + } + testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") + } + if !c.fail && r.Err() != nil { + t.Fatalf("unexpected error: %s", r.Err()) + } + if c.fail && r.Err() == nil { + t.Fatalf("expected error but got none") + } + }) + } + } +} + +func TestReader_Live(t *testing.T) { + logger := testutil.NewLogger(t) + + for i := range testReaderCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + writeFd, err := ioutil.TempFile("", "TestReader_Live") + testutil.Ok(t, err) + defer os.Remove(writeFd.Name()) + + go func(i int) { + for _, rec := range testReaderCases[i].t { + rec := encodedRecord(rec.t, rec.b) + _, err := writeFd.Write(rec) + testutil.Ok(t, err) + runtime.Gosched() + } + writeFd.Close() + }(i) + + // Read from a second FD on the same file. + readFd, err := os.Open(writeFd.Name()) + testutil.Ok(t, err) + reader := NewLiveReader(logger, readFd) + for _, exp := range testReaderCases[i].exp { + for !reader.Next() { + testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err()) + runtime.Gosched() + } + + actual := reader.Record() + testutil.Equals(t, exp, actual, "read wrong record") + } + + testutil.Assert(t, !reader.Next(), "unexpected record") + if testReaderCases[i].fail { + testutil.Assert(t, reader.Err() != nil, "expected error") + } + }) + } +} + +const fuzzLen = 500 + +func generateRandomEntries(w *WAL, records chan []byte) error { + var recs [][]byte + for i := 0; i < fuzzLen; i++ { + var sz int64 + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = pageSize * 8 + } + + rec := make([]byte, rand.Int63n(sz)) + if _, err := rand.Read(rec); err != nil { + return err + } + + records <- rec + + // Randomly batch up records. + recs = append(recs, rec) + if rand.Intn(4) < 3 { + if err := w.Log(recs...); err != nil { + return err + } + recs = recs[:0] + } + } + return w.Log(recs...) +} + +func allSegments(dir string) (io.Reader, error) { + seg, err := listSegments(dir) + if err != nil { + return nil, err + } + + var readers []io.Reader + for _, r := range seg { + f, err := os.Open(filepath.Join(dir, r.name)) + if err != nil { + return nil, err + } + readers = append(readers, f) + } + return io.MultiReader(readers...), nil +} + +func TestReaderFuzz(t *testing.T) { + for name, fn := range readerConstructors { + t.Run(name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_fuzz_live") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + // Buffering required as we're not reading concurrently. + input := make(chan []byte, fuzzLen) + err = generateRandomEntries(w, input) + testutil.Ok(t, err) + close(input) + + err = w.Close() + testutil.Ok(t, err) + + sr, err := allSegments(w.Dir()) + testutil.Ok(t, err) + + reader := fn(sr) + for expected := range input { + testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err()) + testutil.Equals(t, expected, reader.Record(), "read wrong record") + } + testutil.Assert(t, !reader.Next(), "unexpected record") + }) + } +} + +func TestReaderFuzz_Live(t *testing.T) { + logger := testutil.NewLogger(t) + dir, err := ioutil.TempDir("", "wal_fuzz_live") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + // In the background, generate a stream of random records and write them + // to the WAL. + input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. + done := make(chan struct{}) + go func() { + err := generateRandomEntries(w, input) + testutil.Ok(t, err) + time.Sleep(100 * time.Millisecond) + close(done) + }() + + // Tail the WAL and compare the results. + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(logger, seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + + readSegment := func(r *LiveReader) bool { + for r.Next() { + rec := r.Record() + expected, ok := <-input + testutil.Assert(t, ok, "unexpected record") + testutil.Equals(t, expected, rec, "record does not match expected") + } + testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err()) + return true + } + +outer: + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last <= seg.i { + continue + } + + // read to end of segment. + readSegment(r) + + fi, err := os.Stat(SegmentName(dir, seg.i)) + testutil.Ok(t, err) + testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) + + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + r = NewLiveReader(logger, seg) + + case <-readTicker.C: + readSegment(r) + + case <-done: + readSegment(r) + break outer + } + } + + testutil.Assert(t, r.Err() == io.EOF, "expected EOF") +} + +func TestLiveReaderCorrupt_ShortFile(t *testing.T) { + // Write a corrupt WAL segment, there is one record of pageSize in length, + // but the segment is only half written. + logger := testutil.NewLogger(t) + dir, err := ioutil.TempDir("", "wal_live_corrupt") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := NewSize(nil, nil, dir, pageSize) + testutil.Ok(t, err) + + rec := make([]byte, pageSize-recordHeaderSize) + _, err = rand.Read(rec) + testutil.Ok(t, err) + + err = w.Log(rec) + testutil.Ok(t, err) + + err = w.Close() + testutil.Ok(t, err) + + segmentFile, err := os.OpenFile(filepath.Join(dir, "00000000"), os.O_RDWR, 0666) + testutil.Ok(t, err) + + err = segmentFile.Truncate(pageSize / 2) + testutil.Ok(t, err) + + err = segmentFile.Close() + testutil.Ok(t, err) + + // Try and LiveReader it. + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(logger, seg) + testutil.Assert(t, r.Next() == false, "expected no records") + testutil.Assert(t, r.Err() == io.EOF, "expected error, got: %v", r.Err()) +} + +func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { + // Write a corrupt WAL segment, when record len > page size. + logger := testutil.NewLogger(t) + dir, err := ioutil.TempDir("", "wal_live_corrupt") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := NewSize(nil, nil, dir, pageSize*2) + testutil.Ok(t, err) + + rec := make([]byte, pageSize-recordHeaderSize) + _, err = rand.Read(rec) + testutil.Ok(t, err) + + err = w.Log(rec) + testutil.Ok(t, err) + + err = w.Close() + testutil.Ok(t, err) + + segmentFile, err := os.OpenFile(filepath.Join(dir, "00000000"), os.O_RDWR, 0666) + testutil.Ok(t, err) + + // Override the record length + buf := make([]byte, 3) + buf[0] = byte(recFull) + binary.BigEndian.PutUint16(buf[1:], 0xFFFF) + _, err = segmentFile.WriteAt(buf, 0) + testutil.Ok(t, err) + + err = segmentFile.Close() + testutil.Ok(t, err) + + // Try and LiveReader it. + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(logger, seg) + testutil.Assert(t, r.Next() == false, "expected no records") + testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) +} + +func TestReaderData(t *testing.T) { + dir := os.Getenv("WALDIR") + if dir == "" { + return + } + + for name, fn := range readerConstructors { + t.Run(name, func(t *testing.T) { + w, err := New(nil, nil, dir) + testutil.Ok(t, err) + + sr, err := allSegments(dir) + testutil.Ok(t, err) + + reader := fn(sr) + for reader.Next() { + } + testutil.Ok(t, reader.Err()) + + err = w.Repair(reader.Err()) + testutil.Ok(t, err) + }) + } +} diff --git a/wal/wal.go b/wal/wal.go index c7782564f..dd0be9cd8 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -742,390 +742,3 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { r.buf.Reset(r.segs[r.cur]) return n, nil } - -// Reader reads WAL records from an io.Reader. -type Reader struct { - rdr io.Reader - err error - rec []byte - buf [pageSize]byte - total int64 // Total bytes processed. - curRecTyp recType // Used for checking that the last record is not torn. -} - -// NewReader returns a new reader. -func NewReader(r io.Reader) *Reader { - return &Reader{rdr: r} -} - -// Next advances the reader to the next records and returns true if it exists. -// It must not be called again after it returned false. -func (r *Reader) Next() bool { - err := r.next() - if errors.Cause(err) == io.EOF { - // The last WAL segment record shouldn't be torn(should be full or last). - // The last record would be torn after a crash just before - // the last record part could be persisted to disk. - if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { - r.err = errors.New("last record is torn") - } - return false - } - r.err = err - return r.err == nil -} - -func (r *Reader) next() (err error) { - // We have to use r.buf since allocating byte arrays here fails escape - // analysis and ends up on the heap, even though it seemingly should not. - hdr := r.buf[:recordHeaderSize] - buf := r.buf[recordHeaderSize:] - - r.rec = r.rec[:0] - - i := 0 - for { - if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { - return errors.Wrap(err, "read first header byte") - } - r.total++ - r.curRecTyp = recType(hdr[0]) - - // Gobble up zero bytes. - if r.curRecTyp == recPageTerm { - // recPageTerm is a single byte that indicates the rest of the page is padded. - // If it's the first byte in a page, buf is too small and - // needs to be resized to fit pageSize-1 bytes. - buf = r.buf[1:] - - // We are pedantic and check whether the zeros are actually up - // to a page boundary. - // It's not strictly necessary but may catch sketchy state early. - k := pageSize - (r.total % pageSize) - if k == pageSize { - continue // Initial 0 byte was last page byte. - } - n, err := io.ReadFull(r.rdr, buf[:k]) - if err != nil { - return errors.Wrap(err, "read remaining zeros") - } - r.total += int64(n) - - for _, c := range buf[:k] { - if c != 0 { - return errors.New("unexpected non-zero byte in padded page") - } - } - continue - } - n, err := io.ReadFull(r.rdr, hdr[1:]) - if err != nil { - return errors.Wrap(err, "read remaining header") - } - r.total += int64(n) - - var ( - length = binary.BigEndian.Uint16(hdr[1:]) - crc = binary.BigEndian.Uint32(hdr[3:]) - ) - - if length > pageSize-recordHeaderSize { - return errors.Errorf("invalid record size %d", length) - } - n, err = io.ReadFull(r.rdr, buf[:length]) - if err != nil { - return err - } - r.total += int64(n) - - if n != int(length) { - return errors.Errorf("invalid size: expected %d, got %d", length, n) - } - if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { - return errors.Errorf("unexpected checksum %x, expected %x", c, crc) - } - r.rec = append(r.rec, buf[:length]...) - - if err := validateRecord(r.curRecTyp, i); err != nil { - return err - } - if r.curRecTyp == recLast || r.curRecTyp == recFull { - return nil - } - - // Only increment i for non-zero records since we use it - // to determine valid content record sequences. - i++ - } -} - -// Err returns the last encountered error wrapped in a corruption error. -// If the reader does not allow to infer a segment index and offset, a total -// offset in the reader stream will be provided. -func (r *Reader) Err() error { - if r.err == nil { - return nil - } - if b, ok := r.rdr.(*segmentBufReader); ok { - return &CorruptionErr{ - Err: r.err, - Dir: b.segs[b.cur].Dir(), - Segment: b.segs[b.cur].Index(), - Offset: int64(b.off), - } - } - return &CorruptionErr{ - Err: r.err, - Segment: -1, - Offset: r.total, - } -} - -// Record returns the current record. The returned byte slice is only -// valid until the next call to Next. -func (r *Reader) Record() []byte { - return r.rec -} - -// Segment returns the current segment being read. -func (r *Reader) Segment() int { - if b, ok := r.rdr.(*segmentBufReader); ok { - return b.segs[b.cur].Index() - } - return -1 -} - -// Offset returns the current position of the segment being read. -func (r *Reader) Offset() int64 { - if b, ok := r.rdr.(*segmentBufReader); ok { - return int64(b.off) - } - return r.total -} - -// NewLiveReader returns a new live reader. -func NewLiveReader(r io.Reader) *LiveReader { - return &LiveReader{rdr: r} -} - -// Reader reads WAL records from an io.Reader. It buffers partial record data for -// the next read. -type LiveReader struct { - rdr io.Reader - err error - rec []byte - hdr [recordHeaderSize]byte - buf [pageSize]byte - readIndex int // Index in buf to start at for next read. - writeIndex int // Index in buf to start at for next write. - total int64 // Total bytes processed during reading in calls to Next(). - index int // Used to track partial records, should be 0 at the start of every new record. -} - -func (r *LiveReader) Err() error { - return r.err -} - -func (r *LiveReader) TotalRead() int64 { - return r.total -} - -func (r *LiveReader) fillBuffer() error { - n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) - r.writeIndex += n - return err -} - -// Shift the buffer up to the read index. -func (r *LiveReader) shiftBuffer() { - copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) - r.readIndex = 0 - r.writeIndex = copied -} - -// Next returns true if r.rec will contain a full record. -// False does not indicate that there will never be more data to -// read for the current io.Reader. -func (r *LiveReader) Next() bool { - for { - if r.buildRecord() { - return true - } - if r.err != nil && r.err != io.EOF { - return false - } - if r.readIndex == pageSize { - r.shiftBuffer() - } - if r.writeIndex != pageSize { - if err := r.fillBuffer(); err != nil { - // We expect to get EOF, since we're reading the segment file as it's being written. - if err != io.EOF { - r.err = err - } - return false - } - } - } -} - -// Record returns the current record. -// The returned byte slice is only valid until the next call to Next. -func (r *LiveReader) Record() []byte { - return r.rec -} - -// Rebuild a full record from potentially partial records. Returns false -// if there was an error or if we weren't able to read a record for any reason. -// Returns true if we read a full record. Any record data is appeneded to -// LiveReader.rec -func (r *LiveReader) buildRecord() bool { - for { - // Check that we have data in the internal buffer to read. - if r.writeIndex <= r.readIndex { - return false - } - - // Attempt to read a record, partial or otherwise. - temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) - r.readIndex += n - r.total += int64(n) - if err != nil { - r.err = err - return false - } - - if temp == nil { - return false - } - - rt := recType(r.hdr[0]) - - if rt == recFirst || rt == recFull { - r.rec = r.rec[:0] - } - r.rec = append(r.rec, temp...) - - if err := validateRecord(rt, r.index); err != nil { - r.err = err - r.index = 0 - return false - } - if rt == recLast || rt == recFull { - r.index = 0 - return true - } - // Only increment i for non-zero records since we use it - // to determine valid content record sequences. - r.index++ - } -} - -// Returns an error if the recType and i indicate an invalid record sequence. -// As an example, if i is > 0 because we've read some amount of a partial record -// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull -// instead of a recLast or recMiddle we would have an invalid record. -func validateRecord(typ recType, i int) error { - switch typ { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record, dropping buffer") - } - return nil - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record, dropping buffer") - } - return nil - case recLast: - if i == 0 { - return errors.New("unexpected last record, dropping buffer") - } - return nil - default: - return errors.Errorf("unexpected record type %d", typ) - } -} - -// Read a sub-record (see recType) from the buffer. It could potentially -// be a full record (recFull) if the record fits within the bounds of a single page. -// Returns a byte slice of the record data read, the number of bytes read, and an error -// if there's a non-zero byte in a page term record or the record checksum fails. -// TODO(callum) the EOF errors we're returning from this function should theoretically -// never happen, add a metric for them. -func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { - readIndex := 0 - header[0] = buf[0] - readIndex++ - total++ - - // The rest of this function is mostly from Reader.Next(). - typ := recType(header[0]) - // Gobble up zero bytes. - if typ == recPageTerm { - // We are pedantic and check whether the zeros are actually up to a page boundary. - // It's not strictly necessary but may catch sketchy state early. - k := pageSize - (total % pageSize) - if k == pageSize { - return nil, 1, nil // Initial 0 byte was last page byte. - } - - if k <= int64(len(buf)-readIndex) { - for _, v := range buf[readIndex : int64(readIndex)+k] { - readIndex++ - if v != 0 { - return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") - } - } - return nil, readIndex, nil - } - // Not enough bytes to read the rest of the page term rec. - // This theoretically should never happen, since we're only shifting the - // internal buffer of the live reader when we read to the end of page. - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - - if readIndex+recordHeaderSize-1 > len(buf) { - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - - copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) - readIndex += recordHeaderSize - 1 - total += int64(recordHeaderSize - 1) - var ( - length = binary.BigEndian.Uint16(header[1:]) - crc = binary.BigEndian.Uint32(header[3:]) - ) - readTo := int(length) + readIndex - if readTo > len(buf) { - if (readTo - readIndex) > pageSize { - return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) - } - // Not enough data to read all of the record data. - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - recData := buf[readIndex:readTo] - readIndex += int(length) - total += int64(length) - - // TODO(callum) what should we do here, throw out the record? We should add a metric at least. - if c := crc32.Checksum(recData, castagnoliTable); c != crc { - return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) - } - return recData, readIndex, nil -} - -func min(i, j int) int { - if i < j { - return i - } - return j -} diff --git a/wal/wal_test.go b/wal/wal_test.go index c767bb56c..898030add 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -16,394 +16,16 @@ package wal import ( "bytes" - "encoding/binary" "fmt" - "hash/crc32" - "io" "io/ioutil" "math/rand" "os" - "path" "path/filepath" - "sync" "testing" - "time" "github.com/prometheus/tsdb/testutil" ) -type record struct { - t recType - b []byte -} - -var data = make([]byte, 100000) -var testReaderCases = []struct { - t []record - exp [][]byte - fail bool -}{ - // Sequence of valid records. - { - t: []record{ - {recFull, data[0:200]}, - {recFirst, data[200:300]}, - {recLast, data[300:400]}, - {recFirst, data[400:800]}, - {recMiddle, data[800:900]}, - {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. - {recLast, data[900:900]}, - {recFirst, data[900:1000]}, - {recMiddle, data[1000:1200]}, - {recMiddle, data[1200:30000]}, - {recMiddle, data[30000:30001]}, - {recMiddle, data[30001:30001]}, - {recLast, data[30001:32000]}, - }, - exp: [][]byte{ - data[0:200], - data[200:400], - data[400:900], - data[900:32000], - }, - }, - // Exactly at the limit of one page minus the header size - { - t: []record{ - {recFull, data[0 : pageSize-recordHeaderSize]}, - }, - exp: [][]byte{ - data[:pageSize-recordHeaderSize], - }, - }, - // More than a full page, this exceeds our buffer and can never happen - // when written by the WAL. - { - t: []record{ - {recFull, data[0 : pageSize+1]}, - }, - fail: true, - }, - // Invalid orders of record types. - { - t: []record{{recMiddle, data[:200]}}, - fail: true, - }, - { - t: []record{{recLast, data[:200]}}, - fail: true, - }, - { - t: []record{ - {recFirst, data[:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - { - t: []record{ - {recFirst, data[:100]}, - {recMiddle, data[100:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - // Non-zero data after page termination. - { - t: []record{ - {recFull, data[:100]}, - {recPageTerm, append(make([]byte, 1000), 1)}, - }, - exp: [][]byte{data[:100]}, - fail: true, - }, -} - -func encodedRecord(t recType, b []byte) []byte { - if t == recPageTerm { - return append([]byte{0}, b...) - } - r := make([]byte, recordHeaderSize) - r[0] = byte(t) - binary.BigEndian.PutUint16(r[1:], uint16(len(b))) - binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable)) - return append(r, b...) -} - -// TestReader feeds the reader a stream of encoded records with different types. -func TestReader(t *testing.T) { - for i, c := range testReaderCases { - t.Logf("test %d", i) - - var buf []byte - for _, r := range c.t { - buf = append(buf, encodedRecord(r.t, r.b)...) - } - r := NewReader(bytes.NewReader(buf)) - - for j := 0; r.Next(); j++ { - t.Logf("record %d", j) - rec := r.Record() - - if j >= len(c.exp) { - t.Fatal("received more records than inserted") - } - testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") - } - if !c.fail && r.Err() != nil { - t.Fatalf("unexpected error: %s", r.Err()) - } - if c.fail && r.Err() == nil { - t.Fatalf("expected error but got none") - } - } -} - -func TestReader_Live(t *testing.T) { - for i, c := range testReaderCases { - t.Logf("test %d", i) - dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i)) - t.Logf("created dir %s", dir) - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - // we're never going to have more than a single segment file per test case right now - f, err := os.Create(path.Join(dir, "00000000")) - testutil.Ok(t, err) - - // live reader doesn't work on readers created from bytes buffers, - // since we need to be able to write more data to the thing we're - // reading from after the reader has been created - wg := sync.WaitGroup{} - // make sure the reader doesn't start until at least one record is written - wg.Add(1) - go func() { - for i, rec := range c.t { - rec := encodedRecord(rec.t, rec.b) - n, err := f.Write(rec) - testutil.Ok(t, err) - testutil.Assert(t, n > 0, "no bytes were written to wal") - if i == 0 { - wg.Done() - } - } - }() - sr, err := OpenReadSegment(SegmentName(dir, 0)) - testutil.Ok(t, err) - lr := NewLiveReader(sr) - j := 0 - wg.Wait() - caseLoop: - for { - for ; lr.Next(); j++ { - rec := lr.Record() - t.Log("j: ", j) - testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") - if j == len(c.exp)-1 { - break caseLoop - } - - } - - // Because reads and writes are happening concurrently, unless we get an error we should - // attempt to read records again. - if j == 0 && lr.Err() == nil { - continue - } - - if !c.fail && lr.Err() != nil { - t.Fatalf("unexpected error: %s", lr.Err()) - } - if c.fail && lr.Err() == nil { - t.Fatalf("expected error but got none:\n\tinput: %+v", c.t) - } - if lr.Err() != nil { - t.Log("err: ", lr.Err()) - break - } - } - } -} - -func TestWAL_FuzzWriteRead_Live(t *testing.T) { - const count = 500 - var input [][]byte - lock := sync.RWMutex{} - var recs [][]byte - var index int - - // Get size of segment. - getSegmentSize := func(dir string, index int) (int64, error) { - i := int64(-1) - fi, err := os.Stat(SegmentName(dir, index)) - if err == nil { - i = fi.Size() - } - return i, err - } - - readSegment := func(r *LiveReader) { - for r.Next() { - rec := r.Record() - lock.RLock() - l := len(input) - lock.RUnlock() - if index >= l { - t.Fatalf("read too many records") - } - lock.RLock() - if !bytes.Equal(input[index], rec) { - t.Fatalf("record %d (len %d) does not match (expected len %d)", - index, len(rec), len(input[index])) - } - lock.RUnlock() - index++ - } - if r.Err() != io.EOF { - testutil.Ok(t, r.Err()) - } - } - - dir, err := ioutil.TempDir("", "wal_fuzz_live") - t.Log("created dir: ", dir) - testutil.Ok(t, err) - defer func() { - os.RemoveAll(dir) - }() - - w, err := NewSize(nil, nil, dir, 128*pageSize) - testutil.Ok(t, err) - - go func() { - for i := 0; i < count; i++ { - var sz int64 - switch i % 5 { - case 0, 1: - sz = 50 - case 2, 3: - sz = pageSize - default: - sz = pageSize * 8 - } - - rec := make([]byte, rand.Int63n(sz)) - _, err := rand.Read(rec) - testutil.Ok(t, err) - lock.Lock() - input = append(input, rec) - lock.Unlock() - recs = append(recs, rec) - - // Randomly batch up records. - if rand.Intn(4) < 3 { - testutil.Ok(t, w.Log(recs...)) - recs = recs[:0] - } - } - testutil.Ok(t, w.Log(recs...)) - }() - - m, _, err := w.Segments() - testutil.Ok(t, err) - - seg, err := OpenReadSegment(SegmentName(dir, m)) - testutil.Ok(t, err) - - r := NewLiveReader(seg) - segmentTicker := time.NewTicker(100 * time.Millisecond) - readTicker := time.NewTicker(10 * time.Millisecond) - for { - select { - case <-segmentTicker.C: - // check if new segments exist - _, last, err := w.Segments() - testutil.Ok(t, err) - if last > seg.i { - for { - readSegment(r) - if r.Err() != io.EOF { - testutil.Ok(t, r.Err()) - } - size, err := getSegmentSize(dir, seg.i) - testutil.Ok(t, err) - // make sure we've read all of the current segment before rotating - if r.TotalRead() == size { - break - } - } - seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) - testutil.Ok(t, err) - r = NewLiveReader(seg) - } - case <-readTicker.C: - readSegment(r) - } - if index == count { - break - } - } - testutil.Ok(t, r.Err()) -} -func TestWAL_FuzzWriteRead(t *testing.T) { - const count = 25000 - - dir, err := ioutil.TempDir("", "walfuzz") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - w, err := NewSize(nil, nil, dir, 128*pageSize) - testutil.Ok(t, err) - - var input [][]byte - var recs [][]byte - - for i := 0; i < count; i++ { - var sz int - switch i % 5 { - case 0, 1: - sz = 50 - case 2, 3: - sz = pageSize - default: - sz = 8 * pageSize - } - rec := make([]byte, rand.Intn(sz)) - _, err := rand.Read(rec) - testutil.Ok(t, err) - - input = append(input, rec) - recs = append(recs, rec) - - // Randomly batch up records. - if rand.Intn(4) < 3 { - testutil.Ok(t, w.Log(recs...)) - recs = recs[:0] - } - } - testutil.Ok(t, w.Log(recs...)) - - m, n, err := w.Segments() - testutil.Ok(t, err) - - rc, err := NewSegmentsRangeReader(SegmentRange{Dir: dir, First: m, Last: n}) - testutil.Ok(t, err) - defer rc.Close() - - rdr := NewReader(rc) - - for i := 0; rdr.Next(); i++ { - rec := rdr.Record() - if i >= len(input) { - t.Fatal("read too many records") - } - if !bytes.Equal(input[i], rec) { - t.Fatalf("record %d (len %d) does not match (expected len %d)", - i, len(rec), len(input[i])) - } - } - testutil.Ok(t, rdr.Err()) -} - func TestWAL_Repair(t *testing.T) { for name, test := range map[string]struct {