From ee7efa93fe15079273031ca467c4bf3e182c4211 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 19 Feb 2019 16:43:58 +0000 Subject: [PATCH] Fix some tests. Signed-off-by: Tom Wilkie --- storage/remote/queue_manager_test.go | 4 ++-- storage/remote/wal_watcher.go | 19 +++++++------------ storage/remote/wal_watcher_test.go | 6 +----- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index ea624d5ba..f9ea046a4 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -137,7 +137,7 @@ func TestSampleDeliveryOrder(t *testing.T) { } func TestShutdown(t *testing.T) { - deadline := 5 * time.Second + deadline := 1 * time.Second c := NewTestBlockedStorageClient() var temp int64 @@ -155,7 +155,7 @@ func TestShutdown(t *testing.T) { go func() { m.Append(samples) }() - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) // Test to ensure that Stop doesn't block. start := time.Now() diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 4dbcaddef..5e4068afc 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -138,7 +138,6 @@ func (w *WALWatcher) Start() { } func (w *WALWatcher) Stop() { - level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name) close(w.quit) <-w.done level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) @@ -170,13 +169,13 @@ func (w *WALWatcher) run() error { _, last, err := nw.Segments() if err != nil { - return err + return errors.Wrap(err, "wal.Segments") } // Backfill from the checkpoint first if it exists. lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir) if err != nil && err != tsdb.ErrNotFound { - return err + return errors.Wrap(err, "tsdb.LastCheckpoint") } if err == nil { @@ -194,7 +193,6 @@ func (w *WALWatcher) run() error { level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) - level.Info(w.logger).Log("msg", "process segment", "segment", currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. @@ -269,8 +267,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { var err error size, err = getSegmentSize(w.walDir, segmentNum) if err != nil { - level.Error(w.logger).Log("msg", "error getting segment size", "segment", segmentNum) - return errors.Wrap(err, "get segment size") + return errors.Wrap(err, "getSegmentSize") } } @@ -446,25 +443,23 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { // Read all the series records from a Checkpoint directory. func (w *WALWatcher) readCheckpoint(checkpointDir string) error { - level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) index, err := checkpointNum(checkpointDir) if err != nil { - return err + return errors.Wrap(err, "checkpointNum") } sr, err := wal.NewSegmentsReader(checkpointDir) if err != nil { - return errors.Wrap(err, "open checkpoint") + return errors.Wrap(err, "NewSegmentsReader") } defer sr.Close() size, err := getCheckpointSize(checkpointDir) if err != nil { - level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir) - return errors.Wrap(err, "get checkpoint size") + return errors.Wrap(err, "getCheckpointSize") } - // w.readSeriesRecords(wal.NewLiveReader(sr), i, size) r := wal.NewLiveReader(w.logger, sr) if err := w.readSegment(r, index); err != io.EOF { return errors.Wrap(err, "readSegment") diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 9e114f390..ff0a1cd23 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -373,13 +373,11 @@ func Test_decodeRecord(t *testing.T) { buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) watcher.decodeRecord(buf, 0) testutil.Ok(t, err) - testutil.Equals(t, 1, wt.checkNumLabels()) buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) watcher.decodeRecord(buf, 0) testutil.Ok(t, err) - testutil.Equals(t, 2, wt.samplesAppended) } @@ -395,12 +393,10 @@ func Test_decodeRecord_afterStart(t *testing.T) { buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) watcher.decodeRecord(buf, 0) testutil.Ok(t, err) - testutil.Equals(t, 1, wt.checkNumLabels()) buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) watcher.decodeRecord(buf, 0) testutil.Ok(t, err) - - testutil.Equals(t, 1, wt.samplesAppended) + testutil.Equals(t, 2, wt.samplesAppended) }