fix bug that would cause us to endlessly fall behind (#13583)

* fix bug that would cause us to only read from the WAL on the 15s
fallback timer if remote write had fallen behind and is no longer
reading from the WAL segment that is currently being written to

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* remove unintended logging, fix lint, plus allow test to take slightly
longer because cloud CI

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* address review feedback

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix watcher sleeps in test, flu brain is smooth

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* increase timeout, unfortunately cloud CI can require a longer timeout

Signed-off-by: Callum Styan <callumstyan@gmail.com>

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
pull/13628/head
Callum Styan 2024-02-21 17:09:07 -08:00 committed by GitHub
parent aba0071480
commit 0c71230784
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 128 additions and 13 deletions

View File

@ -262,10 +262,8 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
var lastSegment int
var err error
// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
@ -296,9 +294,17 @@ func (w *Watcher) Run() error {
w.currentSegmentMetric.Set(float64(currentSegment))
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for
// between reads if we're reading a segment that is not the most recent segment after startup.
_, lastSegment, err = w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
return err
}

View File

@ -58,29 +58,47 @@ type writeToMock struct {
floatHistogramsAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
// delay reads with a short sleep
delay time.Duration
}
func (wtm *writeToMock) Append(s []record.RefSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.samplesAppended += len(s)
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.exemplarsAppended += len(e)
return true
}
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.histogramsAppended += len(h)
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.floatHistogramsAppended += len(fh)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.UpdateSeriesSegment(series, index)
}
@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int {
return len(wtm.seriesSegmentIndexes)
}
func newWriteToMock() *writeToMock {
func newWriteToMock(delay time.Duration) *writeToMock {
return &writeToMock{
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
delay: delay,
}
}
@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) {
first, last, err := Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
watcher.SetStartTime(now)
@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()
@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()
@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()
@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
require.NoError(t, err)
}
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1
@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1
go watcher.Start()
@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) {
}
require.NoError(t, w.Close())
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments
@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) {
})
}
}
func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
const samplesCount = 300
// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)
enc := record.Encoder{}
w, err := NewSize(nil, nil, wdir, pageSize, compress)
require.NoError(t, err)
var wg sync.WaitGroup
// add one segment initially to ensure there's a value > 0 for the last segment id
for i := 0; i < 1; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < segments; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
}()
wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments
watcher.setMetrics()
startTime := time.Now()
err = watcher.Run()
wg.Wait()
require.Less(t, time.Since(startTime), readTimeout)
require.NoError(t, err)
require.NoError(t, w.Close())
})
}
}