mirror of https://github.com/prometheus/prometheus
Fixed WAL corruption on partial writes within a page (#8125)
* Fixed WAL corruption on partial writes Signed-off-by: Marco Pracucci <marco@pracucci.com> * Renamed variable Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed test Signed-off-by: Marco Pracucci <marco@pracucci.com> * Addressed review comments Signed-off-by: Marco Pracucci <marco@pracucci.com>pull/8137/head
parent
318190021d
commit
63be30dcee
|
@ -74,9 +74,18 @@ func (p *page) reset() {
|
||||||
p.flushed = 0
|
p.flushed = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SegmentFile represents the underlying file used to store a segment.
|
||||||
|
type SegmentFile interface {
|
||||||
|
Stat() (os.FileInfo, error)
|
||||||
|
Sync() error
|
||||||
|
io.Writer
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
// Segment represents a segment file.
|
// Segment represents a segment file.
|
||||||
type Segment struct {
|
type Segment struct {
|
||||||
*os.File
|
SegmentFile
|
||||||
dir string
|
dir string
|
||||||
i int
|
i int
|
||||||
}
|
}
|
||||||
|
@ -130,7 +139,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
|
||||||
return nil, errors.Wrap(err, "zero-pad torn page")
|
return nil, errors.Wrap(err, "zero-pad torn page")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &Segment{File: f, i: k, dir: dir}, nil
|
return &Segment{SegmentFile: f, i: k, dir: dir}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateSegment creates a new segment k in dir.
|
// CreateSegment creates a new segment k in dir.
|
||||||
|
@ -139,7 +148,7 @@ func CreateSegment(dir string, k int) (*Segment, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Segment{File: f, i: k, dir: dir}, nil
|
return &Segment{SegmentFile: f, i: k, dir: dir}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenReadSegment opens the segment with the given filename.
|
// OpenReadSegment opens the segment with the given filename.
|
||||||
|
@ -152,7 +161,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil
|
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WAL is a write ahead log that stores records in segment files.
|
// WAL is a write ahead log that stores records in segment files.
|
||||||
|
@ -517,8 +526,10 @@ func (w *WAL) flushPage(clear bool) error {
|
||||||
if clear {
|
if clear {
|
||||||
p.alloc = pageSize // Write till end of page.
|
p.alloc = pageSize // Write till end of page.
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := w.segment.Write(p.buf[p.flushed:p.alloc])
|
n, err := w.segment.Write(p.buf[p.flushed:p.alloc])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
p.flushed += n
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.flushed += n
|
p.flushed += n
|
||||||
|
@ -664,6 +675,9 @@ func (w *WAL) log(rec []byte, final bool) error {
|
||||||
|
|
||||||
if w.page.full() {
|
if w.page.full() {
|
||||||
if err := w.flushPage(true); err != nil {
|
if err := w.flushPage(true); err != nil {
|
||||||
|
// TODO When the flushing fails at this point and the record has not been
|
||||||
|
// fully written to the buffer, we end up with a corrupted WAL because some part of the
|
||||||
|
// record have been written to the buffer, while the rest of the record will be discarded.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -705,7 +719,7 @@ func (w *WAL) Truncate(i int) (err error) {
|
||||||
|
|
||||||
func (w *WAL) fsync(f *Segment) error {
|
func (w *WAL) fsync(f *Segment) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := f.File.Sync()
|
err := f.Sync()
|
||||||
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package wal
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
@ -424,6 +425,103 @@ func TestCompression(t *testing.T) {
|
||||||
require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
|
require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLogPartialWrite(t *testing.T) {
|
||||||
|
const segmentSize = pageSize * 2
|
||||||
|
record := []byte{1, 2, 3, 4, 5}
|
||||||
|
|
||||||
|
tests := map[string]struct {
|
||||||
|
numRecords int
|
||||||
|
faultyRecord int
|
||||||
|
}{
|
||||||
|
"partial write when logging first record in a page": {
|
||||||
|
numRecords: 10,
|
||||||
|
faultyRecord: 1,
|
||||||
|
},
|
||||||
|
"partial write when logging record in the middle of a page": {
|
||||||
|
numRecords: 10,
|
||||||
|
faultyRecord: 3,
|
||||||
|
},
|
||||||
|
"partial write when logging last record of a page": {
|
||||||
|
numRecords: (pageSize / (recordHeaderSize + len(record))) + 10,
|
||||||
|
faultyRecord: pageSize / (recordHeaderSize + len(record)),
|
||||||
|
},
|
||||||
|
// TODO the current implementation suffers this:
|
||||||
|
//"partial write when logging a record overlapping two pages": {
|
||||||
|
// numRecords: (pageSize / (recordHeaderSize + len(record))) + 10,
|
||||||
|
// faultyRecord: pageSize/(recordHeaderSize+len(record)) + 1,
|
||||||
|
//},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, testData := range tests {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
dirPath, err := ioutil.TempDir("", "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
w, err := NewSize(nil, nil, dirPath, segmentSize, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Replace the underlying segment file with a mocked one that injects a failure.
|
||||||
|
w.segment.SegmentFile = &faultySegmentFile{
|
||||||
|
SegmentFile: w.segment.SegmentFile,
|
||||||
|
writeFailureAfter: ((recordHeaderSize + len(record)) * (testData.faultyRecord - 1)) + 2,
|
||||||
|
writeFailureErr: io.ErrShortWrite,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; i <= testData.numRecords; i++ {
|
||||||
|
if err := w.Log(record); i == testData.faultyRecord {
|
||||||
|
require.Error(t, io.ErrShortWrite, err)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, w.Close())
|
||||||
|
|
||||||
|
// Read it back. We expect no corruption.
|
||||||
|
s, err := OpenReadSegment(SegmentName(dirPath, 0))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
r := NewReader(NewSegmentBufReader(s))
|
||||||
|
for i := 0; i < testData.numRecords; i++ {
|
||||||
|
require.True(t, r.Next())
|
||||||
|
require.NoError(t, r.Err())
|
||||||
|
require.Equal(t, record, r.Record())
|
||||||
|
}
|
||||||
|
require.False(t, r.Next())
|
||||||
|
require.NoError(t, r.Err())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type faultySegmentFile struct {
|
||||||
|
SegmentFile
|
||||||
|
|
||||||
|
written int
|
||||||
|
writeFailureAfter int
|
||||||
|
writeFailureErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *faultySegmentFile) Write(p []byte) (int, error) {
|
||||||
|
if f.writeFailureAfter >= 0 && f.writeFailureAfter < f.written+len(p) {
|
||||||
|
partialLen := f.writeFailureAfter - f.written
|
||||||
|
if partialLen <= 0 || partialLen >= len(p) {
|
||||||
|
partialLen = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inject failure.
|
||||||
|
n, _ := f.SegmentFile.Write(p[:partialLen])
|
||||||
|
f.written += n
|
||||||
|
f.writeFailureAfter = -1
|
||||||
|
|
||||||
|
return n, f.writeFailureErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxy the write to the underlying file.
|
||||||
|
n, err := f.SegmentFile.Write(p)
|
||||||
|
f.written += n
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkWAL_LogBatched(b *testing.B) {
|
func BenchmarkWAL_LogBatched(b *testing.B) {
|
||||||
for _, compress := range []bool{true, false} {
|
for _, compress := range []bool{true, false} {
|
||||||
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {
|
||||||
|
|
Loading…
Reference in New Issue