diff --git a/db.go b/db.go index 1b508e066..e6a0a74b4 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, diff --git a/head.go b/head.go index 61457911f..f52fea726 100644 --- a/head.go +++ b/head.go @@ -392,6 +392,8 @@ func (h *Head) Init() error { if err == nil { return nil } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } diff --git a/wal.go b/wal.go index c1b9f6b06..773b64282 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,86 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) error { + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + var hdr [4]byte + if n, err := f.Read(hdr[:]); err != nil { + return errors.Wrap(err, "read header from first segment") + } else if n != 4 { + return errors.New("could not read full header from segment") + } + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil // Not the old WAL anymore. + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Rename(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/wal_test.go b/wal_test.go index 6d559f5d9..680ebe06a 100644 --- a/wal_test.go +++ b/wal_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "path" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func TestSegmentWAL_cut(t *testing.T) { @@ -431,3 +433,100 @@ func TestWALRestoreCorrupted(t *testing.T) { }) } } + +func TestMigrateWAL_Fuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Should pass if no WAL exists yet. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) + testutil.Ok(t, err) + + // Write some data. + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 1, T: 100, V: 200}, + {Ref: 2, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 3, T: 100, V: 200}, + {Ref: 4, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogDeletes([]Stone{ + {ref: 1, intervals: []Interval{{100, 200}}}, + })) + + testutil.Ok(t, oldWAL.Close()) + + // Perform migration. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + + // We can properly write some new data after migration. + var enc RecordEncoder + testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + {Ref: 500, T: 1, V: 1}, + }, nil))) + + testutil.Ok(t, w.Close()) + + // Read back all data. + sr, err := wal.NewSegmentsReader(wdir) + testutil.Ok(t, err) + + r := wal.NewReader(sr) + var res []interface{} + var dec RecordDecoder + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + s, err := dec.Series(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordSamples: + s, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordTombstones: + s, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + default: + t.Fatalf("unknown record type %d", dec.Type(rec)) + } + } + testutil.Ok(t, r.Err()) + + testutil.Equals(t, []interface{}{ + []RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + }, + []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + }, + []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, + []RefSample{{Ref: 500, T: 1, V: 1}}, + }, res) + + // Migrating an already migrated WAL shouldn't do anything. + testutil.Ok(t, MigrateWAL(nil, wdir)) +}