diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f9ea046a4..03b71a19e 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io/ioutil" + "math" "os" "reflect" "sync" @@ -24,6 +25,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/stretchr/testify/require" @@ -378,3 +380,26 @@ func (c *TestBlockingStorageClient) NumCalls() uint64 { func (c *TestBlockingStorageClient) Name() string { return "testblockingstorageclient" } + +func BenchmarkStartup(b *testing.B) { + dir := os.Getenv("WALDIR") + if dir == "" { + return + } + + fmt.Println(dir) + + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) + logger = log.With(logger, "caller", log.DefaultCaller) + + for n := 0; n < b.N; n++ { + var temp int64 + c := NewTestBlockedStorageClient() + m := NewQueueManager(logger, dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + &temp, config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) + m.watcher.startTime = math.MaxInt64 + m.watcher.maxSegment = 6158 // n-1 + m.watcher.run() + } +} diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 5e4068afc..f5cf7fc69 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -110,6 +110,9 @@ type WALWatcher struct { quit chan struct{} done chan struct{} + + // For testing, stop when we hit this segment. + maxSegment int } // NewWALWatcher creates a new WAL watcher for a given WriteTo. @@ -200,6 +203,10 @@ func (w *WALWatcher) run() error { return err } + if currentSegment == w.maxSegment { + return nil + } + currentSegment++ } @@ -295,7 +302,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { continue } - err = w.readSegment(reader, segmentNum) + err = w.readSegment(reader, segmentNum, tail) // Ignore errors reading to end of segment whilst replaying the WAL. if !tail { @@ -315,7 +322,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { return nil case <-readTicker.C: - err = w.readSegment(reader, segmentNum) + err = w.readSegment(reader, segmentNum, tail) // Ignore all errors reading to end of segment whilst replaying the WAL. if !tail { @@ -368,10 +375,56 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { return nil } -func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error { +func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) error { + var ( + dec tsdb.RecordDecoder + series []tsdb.RefSeries + samples []tsdb.RefSample + ) + for r.Next() && !isClosed(w.quit) { - if err := w.decodeRecord(r.Record(), segmentNum); err != nil { - return err + rec := r.Record() + w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() + + switch dec.Type(rec) { + case tsdb.RecordSeries: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.StoreSeries(series, segmentNum) + + case tsdb.RecordSamples: + // If we're not tailing a segment we can ignore any samples records we see. + // This speeds up replay of the WAL by > 10x. + if !tail { + break + } + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + var send []tsdb.RefSample + for _, s := range samples { + if s.T > w.startTime { + send = append(send, s) + } + } + if len(send) > 0 { + // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). + w.writer.Append(send) + } + + case tsdb.RecordTombstones: + // noop + case tsdb.RecordInvalid: + return errors.New("invalid record") + + default: + w.recordDecodeFailsMetric.Inc() + return errors.New("unknown TSDB record type") } } return r.Err() @@ -392,55 +445,6 @@ func recordType(rt tsdb.RecordType) string { } } -func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { - var ( - dec tsdb.RecordDecoder - series []tsdb.RefSeries - samples []tsdb.RefSample - ) - - w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() - - switch dec.Type(rec) { - case tsdb.RecordSeries: - series, err := dec.Series(rec, series[:0]) - if err != nil { - w.recordDecodeFailsMetric.Inc() - return err - } - w.writer.StoreSeries(series, segmentNum) - return nil - - case tsdb.RecordSamples: - samples, err := dec.Samples(rec, samples[:0]) - if err != nil { - w.recordDecodeFailsMetric.Inc() - return err - } - var send []tsdb.RefSample - for _, s := range samples { - if s.T > w.startTime { - send = append(send, s) - } - } - if len(send) > 0 { - // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). - w.writer.Append(send) - } - return nil - - case tsdb.RecordTombstones: - return nil - - case tsdb.RecordInvalid: - return errors.New("invalid record") - - default: - w.recordDecodeFailsMetric.Inc() - return errors.New("unknown TSDB record type") - } -} - // Read all the series records from a Checkpoint directory. func (w *WALWatcher) readCheckpoint(checkpointDir string) error { level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) @@ -461,7 +465,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } r := wal.NewLiveReader(w.logger, sr) - if err := w.readSegment(r, index); err != io.EOF { + if err := w.readSegment(r, index, false); err != io.EOF { return errors.Wrap(err, "readSegment") } diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index ff0a1cd23..27994a595 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -39,7 +39,6 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) { if f() { return } - t.Logf("retry %d/%d", i, n) <-ticker.C } ticker.Stop() @@ -89,6 +88,78 @@ func newWriteToMock() *writeToMock { } } +func Test_tail_samples(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + now := time.Now() + + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + // os.Create(wal.SegmentName(wdir, 30)) + + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + testutil.Ok(t, err) + + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(now.UnixNano()) + 1, + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + // Start read after checkpoint, no more data written. + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + watcher.startTime = now.UnixNano() + for i := first; i <= last; i++ { + segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) + testutil.Ok(t, err) + defer segment.Close() + + reader := wal.NewLiveReader(nil, segment) + // Use tail true so we can ensure we got the right number of samples. + watcher.readSegment(reader, i, true) + } + go watcher.Start() + + expectedSeries := seriesCount + expectedSamples := seriesCount * samplesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expectedSeries + }) + watcher.Stop() + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) + testutil.Equals(t, expectedSamples, wt.samplesAppended) +} + func Test_readToEnd_noCheckpoint(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 @@ -152,7 +223,9 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { } func Test_readToEnd_withCheckpoint(t *testing.T) { - pageSize := 32 * 1024 + segmentSize := 32 * 1024 + // We need something similar to this # of series and samples + // in order to get enough segments for us to checkpoint. const seriesCount = 10 const samplesCount = 250 @@ -164,14 +237,12 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - os.Create(wal.SegmentName(wdir, 30)) - enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + w, err := wal.NewSize(nil, nil, wdir, segmentSize) testutil.Ok(t, err) // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount*10; i++ { + for i := 0; i < seriesCount; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ tsdb.RefSeries{ @@ -181,7 +252,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { }, nil) testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount*10; j++ { + for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) sample := enc.Samples([]tsdb.RefSample{ tsdb.RefSample{ @@ -193,11 +264,12 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { testutil.Ok(t, w.Log(sample)) } } - tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) - w.Truncate(32) + + tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + w.Truncate(1) // Write more records after checkpointing. - for i := 0; i < seriesCount*10; i++ { + for i := 0; i < seriesCount; i++ { series := enc.Series([]tsdb.RefSeries{ tsdb.RefSeries{ Ref: uint64(i), @@ -206,7 +278,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { }, nil) testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount*10; j++ { + for j := 0; j < samplesCount; j++ { sample := enc.Samples([]tsdb.RefSample{ tsdb.RefSample{ Ref: uint64(j), @@ -224,7 +296,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { watcher := NewWALWatcher(nil, "", wt, dir) go watcher.Start() - expected := seriesCount * 10 * 2 + expected := seriesCount * 2 retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expected }) @@ -252,7 +324,7 @@ func Test_readCheckpoint(t *testing.T) { testutil.Ok(t, err) // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount*10; i++ { + for i := 0; i < seriesCount; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ tsdb.RefSeries{ @@ -262,7 +334,7 @@ func Test_readCheckpoint(t *testing.T) { }, nil) testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount*10; j++ { + for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) sample := enc.Samples([]tsdb.RefSample{ tsdb.RefSample{ @@ -283,18 +355,21 @@ func Test_readCheckpoint(t *testing.T) { wt := newWriteToMock() watcher := NewWALWatcher(nil, "", wt, dir) + // watcher. go watcher.Start() - expected := seriesCount * 10 + expectedSeries := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected + return wt.checkNumLabels() >= expectedSeries }) watcher.Stop() - testutil.Equals(t, expected, wt.checkNumLabels()) + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) } func Test_checkpoint_seriesReset(t *testing.T) { - pageSize := 32 * 1024 + segmentSize := 32 * 1024 + // We need something similar to this # of series and samples + // in order to get enough segments for us to checkpoint. const seriesCount = 10 const samplesCount = 250 @@ -307,11 +382,11 @@ func Test_checkpoint_seriesReset(t *testing.T) { testutil.Ok(t, err) enc := tsdb.RecordEncoder{} - w, err := wal.NewSize(nil, nil, wdir, pageSize) + w, err := wal.NewSize(nil, nil, wdir, segmentSize) testutil.Ok(t, err) // Write to the initial segment, then checkpoint later. - for i := 0; i < seriesCount*10; i++ { + for i := 0; i < seriesCount; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ tsdb.RefSeries{ @@ -321,7 +396,7 @@ func Test_checkpoint_seriesReset(t *testing.T) { }, nil) testutil.Ok(t, w.Log(series)) - for j := 0; j < samplesCount*10; j++ { + for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) sample := enc.Samples([]tsdb.RefSample{ tsdb.RefSample{ @@ -339,64 +414,27 @@ func Test_checkpoint_seriesReset(t *testing.T) { wt := newWriteToMock() watcher := NewWALWatcher(nil, "", wt, dir) + watcher.maxSegment = -1 go watcher.Start() - expected := seriesCount * 10 + expected := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expected }) watcher.Stop() - testutil.Equals(t, seriesCount*10, wt.checkNumLabels()) + testutil.Equals(t, seriesCount, wt.checkNumLabels()) + _, err = tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + testutil.Ok(t, err) + + w.Truncate(1) + + _, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) + testutil.Ok(t, err) + err = watcher.garbageCollectSeries(cpi + 1) + testutil.Ok(t, err) // If you modify the checkpoint and truncate segment #'s run the test to see how // many series records you end up with and change the last Equals check accordingly // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) - _, err = tsdb.Checkpoint(w, 50, 200, func(x uint64) bool { return true }, 0) - testutil.Ok(t, err) - w.Truncate(200) - - cp, _, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) - testutil.Ok(t, err) - err = watcher.readCheckpoint(cp) - testutil.Ok(t, err) -} - -func Test_decodeRecord(t *testing.T) { - dir, err := ioutil.TempDir("", "decodeRecord") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - - enc := tsdb.RecordEncoder{} - buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) - watcher.decodeRecord(buf, 0) - testutil.Ok(t, err) - testutil.Equals(t, 1, wt.checkNumLabels()) - - buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) - watcher.decodeRecord(buf, 0) - testutil.Ok(t, err) - testutil.Equals(t, 2, wt.samplesAppended) -} - -func Test_decodeRecord_afterStart(t *testing.T) { - dir, err := ioutil.TempDir("", "decodeRecord") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) - - enc := tsdb.RecordEncoder{} - buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) - watcher.decodeRecord(buf, 0) - testutil.Ok(t, err) - testutil.Equals(t, 1, wt.checkNumLabels()) - - buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) - watcher.decodeRecord(buf, 0) - testutil.Ok(t, err) - testutil.Equals(t, 2, wt.samplesAppended) + testutil.Equals(t, 6, wt.checkNumLabels()) }