[tsdb] re-implement WAL watcher to read via a "notification" channel (#11949)

* WIP implement WAL watcher reading via notifications over a channel from
the TSDB code

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Notify via head appenders Commit (finished all WAL logging) rather than
on each WAL Log call

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix misspelled Notify plus add a metric for dropped Write notifications

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Update tests to handle new notification pattern

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* this test maybe needs more time on windows?

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* does this test need more time on windows as well?

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* read timeout is already a time.Duration

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* remove mistakenly commited benchmark data files

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* address some review feedback

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix missed changes from previous commit

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix issues from wrapper function

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* try fixing race condition in test by allowing tests to overwrite the
read ticker timeout instead of calling the Notify function

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix linting

Signed-off-by: Callum Styan <callumstyan@gmail.com>

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
pull/12316/head
Callum Styan 2023-05-15 12:31:49 -07:00 committed by GitHub
parent b727e69b76
commit 0d2108ad79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 40 deletions

View File

@ -1053,6 +1053,7 @@ func main() {
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin) localStorage.Set(db, startTimeMargin)
db.SetWriteNotified(remoteStorage)
close(dbOpen) close(dbOpen)
<-cancel <-cancel
return nil return nil

View File

@ -76,6 +76,13 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
return s return s
} }
func (s *Storage) Notify() {
for _, q := range s.rws.queues {
// These should all be non blocking
q.watcher.Notify()
}
}
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error { func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock() s.mtx.Lock()

View File

@ -229,6 +229,8 @@ type DB struct {
// out-of-order compaction and vertical queries. // out-of-order compaction and vertical queries.
oooWasEnabled atomic.Bool oooWasEnabled atomic.Bool
writeNotified wlog.WriteNotified
registerer prometheus.Registerer registerer prometheus.Registerer
} }
@ -802,6 +804,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if err != nil { if err != nil {
return nil, err return nil, err
} }
db.head.writeNotified = db.writeNotified
// Register metrics after assigning the head block. // Register metrics after assigning the head block.
db.metrics = newDBMetrics(db, r) db.metrics = newDBMetrics(db, r)
@ -2016,6 +2019,12 @@ func (db *DB) CleanTombstones() (err error) {
return nil return nil
} }
func (db *DB) SetWriteNotified(wn wlog.WriteNotified) {
db.writeNotified = wn
// It's possible we already created the head struct, so we should also set the WN for that.
db.head.writeNotified = wn
}
func isBlockDir(fi fs.DirEntry) bool { func isBlockDir(fi fs.DirEntry) bool {
if !fi.IsDir() { if !fi.IsDir() {
return false return false

View File

@ -121,6 +121,8 @@ type Head struct {
stats *HeadStats stats *HeadStats
reg prometheus.Registerer reg prometheus.Registerer
writeNotified wlog.WriteNotified
memTruncationInProcess atomic.Bool memTruncationInProcess atomic.Bool
} }

View File

@ -842,6 +842,10 @@ func (a *headAppender) Commit() (err error) {
return errors.Wrap(err, "write to WAL") return errors.Wrap(err, "write to WAL")
} }
if a.head.writeNotified != nil {
a.head.writeNotified.Notify()
}
// No errors logging to WAL, so pass the exemplars along to the in memory storage. // No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range a.exemplars { for _, e := range a.exemplars {
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))

View File

@ -34,12 +34,16 @@ import (
) )
const ( const (
readPeriod = 10 * time.Millisecond
checkpointPeriod = 5 * time.Second checkpointPeriod = 5 * time.Second
segmentCheckPeriod = 100 * time.Millisecond segmentCheckPeriod = 100 * time.Millisecond
consumer = "consumer" consumer = "consumer"
) )
var (
ErrIgnorable = errors.New("ignore me")
readTimeout = 15 * time.Second
)
// WriteTo is an interface used by the Watcher to send the samples it's read // WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. Functions will be called concurrently // from the WAL on to somewhere else. Functions will be called concurrently
// and it is left to the implementer to make sure they are safe. // and it is left to the implementer to make sure they are safe.
@ -61,11 +65,17 @@ type WriteTo interface {
SeriesReset(int) SeriesReset(int)
} }
// Used to notifier the watcher that data has been written so that it can read.
type WriteNotified interface {
Notify()
}
type WatcherMetrics struct { type WatcherMetrics struct {
recordsRead *prometheus.CounterVec recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec recordDecodeFails *prometheus.CounterVec
samplesSentPreTailing *prometheus.CounterVec samplesSentPreTailing *prometheus.CounterVec
currentSegment *prometheus.GaugeVec currentSegment *prometheus.GaugeVec
notificationsSkipped *prometheus.CounterVec
} }
// Watcher watches the TSDB WAL for a given WriteTo. // Watcher watches the TSDB WAL for a given WriteTo.
@ -88,9 +98,11 @@ type Watcher struct {
recordDecodeFailsMetric prometheus.Counter recordDecodeFailsMetric prometheus.Counter
samplesSentPreTailing prometheus.Counter samplesSentPreTailing prometheus.Counter
currentSegmentMetric prometheus.Gauge currentSegmentMetric prometheus.Gauge
notificationsSkipped prometheus.Counter
quit chan struct{} readNotify chan struct{}
done chan struct{} quit chan struct{}
done chan struct{}
// For testing, stop when we hit this segment. // For testing, stop when we hit this segment.
MaxSegment int MaxSegment int
@ -134,6 +146,15 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
}, },
[]string{consumer}, []string{consumer},
), ),
notificationsSkipped: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "notifications_skipped_total",
Help: "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine.",
},
[]string{consumer},
),
} }
if reg != nil { if reg != nil {
@ -141,6 +162,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
reg.MustRegister(m.recordDecodeFails) reg.MustRegister(m.recordDecodeFails)
reg.MustRegister(m.samplesSentPreTailing) reg.MustRegister(m.samplesSentPreTailing)
reg.MustRegister(m.currentSegment) reg.MustRegister(m.currentSegment)
reg.MustRegister(m.notificationsSkipped)
} }
return m return m
@ -161,13 +183,25 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
sendExemplars: sendExemplars, sendExemplars: sendExemplars,
sendHistograms: sendHistograms, sendHistograms: sendHistograms,
quit: make(chan struct{}), readNotify: make(chan struct{}),
done: make(chan struct{}), quit: make(chan struct{}),
done: make(chan struct{}),
MaxSegment: -1, MaxSegment: -1,
} }
} }
func (w *Watcher) Notify() {
select {
case w.readNotify <- struct{}{}:
return
default: // default so we can exit
// we don't need a buffered channel or any buffering since
// for each notification it recv's the watcher will read until EOF
w.notificationsSkipped.Inc()
}
}
func (w *Watcher) setMetrics() { func (w *Watcher) setMetrics() {
// Setup the WAL Watchers metrics. We do this here rather than in the // Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, // constructor because of the ordering of creating Queue Managers's,
@ -177,6 +211,8 @@ func (w *Watcher) setMetrics() {
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
} }
} }
@ -262,7 +298,7 @@ func (w *Watcher) Run() error {
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
return err return err
} }
@ -330,6 +366,26 @@ func (w *Watcher) segments(dir string) ([]int, error) {
return refs, nil return refs, nil
} }
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
if err != nil && errors.Cause(err) != io.EOF {
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return ErrIgnorable
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if errors.Cause(err) != io.EOF {
return err
}
return nil
}
// Use tail true to indicate that the reader is currently on a segment that is // Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're // actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records. // replaying it on start to cache the series records.
@ -342,7 +398,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
reader := NewLiveReader(w.logger, w.readerMetrics, segment) reader := NewLiveReader(w.logger, w.readerMetrics, segment)
readTicker := time.NewTicker(readPeriod) readTicker := time.NewTicker(readTimeout)
defer readTicker.Stop() defer readTicker.Stop()
checkpointTicker := time.NewTicker(checkpointPeriod) checkpointTicker := time.NewTicker(checkpointPeriod)
@ -400,7 +456,6 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if last <= segmentNum { if last <= segmentNum {
continue continue
} }
err = w.readSegment(reader, segmentNum, tail) err = w.readSegment(reader, segmentNum, tail)
// Ignore errors reading to end of segment whilst replaying the WAL. // Ignore errors reading to end of segment whilst replaying the WAL.
@ -421,24 +476,23 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return nil return nil
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C: case <-readTicker.C:
err = w.readSegment(reader, segmentNum, tail) level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
// Ignore all errors reading to end of segment whilst replaying the WAL. if err != nil {
if !tail {
switch {
case err != nil && errors.Cause(err) != io.EOF:
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
case reader.Offset() != size:
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if errors.Cause(err) != io.EOF {
return err return err
} }
// still want to reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
return err
}
// still want to reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
} }
} }
} }

View File

@ -104,7 +104,7 @@ func (wtm *writeToMock) SeriesReset(index int) {
} }
} }
func (wtm *writeToMock) checkNumLabels() int { func (wtm *writeToMock) checkNumSeries() int {
wtm.seriesLock.Lock() wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock() defer wtm.seriesLock.Unlock()
return len(wtm.seriesSegmentIndexes) return len(wtm.seriesSegmentIndexes)
@ -230,9 +230,9 @@ func TestTailSamples(t *testing.T) {
expectedExemplars := seriesCount * exemplarsCount expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount expectedHistograms := seriesCount * histogramsCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries return wt.checkNumSeries() >= expectedSeries
}) })
require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series") require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
@ -290,7 +290,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
} }
} }
require.NoError(t, w.Log(recs...)) require.NoError(t, w.Log(recs...))
readTimeout = time.Second
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
@ -299,11 +299,10 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
go watcher.Start() go watcher.Start()
expected := seriesCount expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { require.Eventually(t, func() bool {
return wt.checkNumLabels() >= expected return wt.checkNumSeries() == expected
}) }, 20*time.Second, 1*time.Second)
watcher.Stop() watcher.Stop()
require.Equal(t, expected, wt.checkNumLabels())
}) })
} }
} }
@ -383,16 +382,17 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start() go watcher.Start()
expected := seriesCount * 2 expected := seriesCount * 2
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected require.Eventually(t, func() bool {
}) return wt.checkNumSeries() == expected
}, 10*time.Second, 1*time.Second)
watcher.Stop() watcher.Stop()
require.Equal(t, expected, wt.checkNumLabels())
}) })
} }
} }
@ -460,10 +460,10 @@ func TestReadCheckpoint(t *testing.T) {
expectedSeries := seriesCount expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries return wt.checkNumSeries() >= expectedSeries
}) })
watcher.Stop() watcher.Stop()
require.Equal(t, expectedSeries, wt.checkNumLabels()) require.Equal(t, expectedSeries, wt.checkNumSeries())
}) })
} }
} }
@ -595,6 +595,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
_, _, err = Segments(w.Dir()) _, _, err = Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
@ -602,9 +603,11 @@ func TestCheckpointSeriesReset(t *testing.T) {
expected := seriesCount expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expected return wt.checkNumSeries() >= expected
}) })
require.Equal(t, seriesCount, wt.checkNumLabels()) require.Eventually(t, func() bool {
return wt.checkNumSeries() == seriesCount
}, 10*time.Second, 1*time.Second)
_, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0) _, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0)
require.NoError(t, err) require.NoError(t, err)
@ -621,7 +624,9 @@ func TestCheckpointSeriesReset(t *testing.T) {
// If you modify the checkpoint and truncate segment #'s run the test to see how // If you modify the checkpoint and truncate segment #'s run the test to see how
// many series records you end up with and change the last Equals check accordingly // many series records you end up with and change the last Equals check accordingly
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
require.Equal(t, tc.segments, wt.checkNumLabels()) require.Eventually(t, func() bool {
return wt.checkNumSeries() == tc.segments
}, 20*time.Second, 1*time.Second)
}) })
} }
} }

View File

@ -188,6 +188,8 @@ type WL struct {
compress bool compress bool
snappyBuf []byte snappyBuf []byte
WriteNotified WriteNotified
metrics *wlMetrics metrics *wlMetrics
} }
@ -343,6 +345,10 @@ func (w *WL) Dir() string {
return w.dir return w.dir
} }
func (w *WL) SetWriteNotified(wn WriteNotified) {
w.WriteNotified = wn
}
func (w *WL) run() { func (w *WL) run() {
Loop: Loop:
for { for {