chore(remote_write): clean up as watcher.go is part of wlog now

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/14596/head
machine424 2024-07-08 12:15:37 +02:00 committed by Ayoub Mrini
parent e92a18b6ce
commit 9e43ad2e37
1 changed files with 18 additions and 74 deletions

View File

@ -20,7 +20,6 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -265,9 +264,9 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed // Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit. // or an error case is hit.
func (w *Watcher) Run() error { func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast() _, lastSegment, err := Segments(w.walDir)
if err != nil { if err != nil {
return fmt.Errorf("wal.Segments: %w", err) return fmt.Errorf("Segments: %w", err)
} }
// We want to ensure this is false across iterations since // We want to ensure this is false across iterations since
@ -318,57 +317,20 @@ func (w *Watcher) Run() error {
// findSegmentForIndex finds the first segment greater than or equal to index. // findSegmentForIndex finds the first segment greater than or equal to index.
func (w *Watcher) findSegmentForIndex(index int) (int, error) { func (w *Watcher) findSegmentForIndex(index int) (int, error) {
refs, err := w.segments(w.walDir) refs, err := listSegments(w.walDir)
if err != nil { if err != nil {
return -1, err return -1, err
} }
for _, r := range refs { for _, r := range refs {
if r >= index { if r.index >= index {
return r, nil return r.index, nil
} }
} }
return -1, errors.New("failed to find segment for index") return -1, errors.New("failed to find segment for index")
} }
func (w *Watcher) firstAndLast() (int, int, error) {
refs, err := w.segments(w.walDir)
if err != nil {
return -1, -1, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0], refs[len(refs)-1], nil
}
// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
func (w *Watcher) segments(dir string) ([]int, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var refs []int
for _, f := range files {
k, err := strconv.Atoi(f.Name())
if err != nil {
continue
}
refs = append(refs, k)
}
slices.Sort(refs)
for i := 0; i < len(refs)-1; i++ {
if refs[i]+1 != refs[i+1] {
return nil, errors.New("segments are not sequential")
}
}
return refs, nil
}
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail) err := w.readSegment(r, segmentNum, tail)
@ -447,35 +409,17 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
// Currently doing a garbage collect, try again later. // Currently doing a garbage collect, try again later.
} }
// if a newer segment is produced, read the current one until the end and move on.
case <-segmentTicker.C: case <-segmentTicker.C:
_, last, err := w.firstAndLast() _, last, err := Segments(w.walDir)
if err != nil { if err != nil {
return fmt.Errorf("segments: %w", err) return fmt.Errorf("Segments: %w", err)
} }
// Check if new segments exists. if last > segmentNum {
if last <= segmentNum { return w.readAndHandleError(reader, segmentNum, tail, size)
continue
} }
err = w.readSegment(reader, segmentNum, tail) continue
// Ignore errors reading to end of segment whilst replaying the WAL.
if !tail {
switch {
case err != nil && !errors.Is(err, io.EOF):
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
case reader.Offset() != size:
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
// we haven't read due to a notification in quite some time, try reading anyways // we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C: case <-readTicker.C:
@ -484,7 +428,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if err != nil { if err != nil {
return err return err
} }
// still want to reset the ticker so we don't read too often // reset the ticker so we don't read too often
readTicker.Reset(readTimeout) readTicker.Reset(readTimeout)
case <-w.readNotify: case <-w.readNotify:
@ -492,7 +436,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if err != nil { if err != nil {
return err return err
} }
// still want to reset the ticker so we don't read too often // reset the ticker so we don't read too often
readTicker.Reset(readTimeout) readTicker.Reset(readTimeout)
} }
} }
@ -731,17 +675,17 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
} }
// Ensure we read the whole contents of every segment in the checkpoint dir. // Ensure we read the whole contents of every segment in the checkpoint dir.
segs, err := w.segments(checkpointDir) segs, err := listSegments(checkpointDir)
if err != nil { if err != nil {
return fmt.Errorf("Unable to get segments checkpoint dir: %w", err) return fmt.Errorf("Unable to get segments checkpoint dir: %w", err)
} }
for _, seg := range segs { for _, segRef := range segs {
size, err := getSegmentSize(checkpointDir, seg) size, err := getSegmentSize(checkpointDir, segRef.index)
if err != nil { if err != nil {
return fmt.Errorf("getSegmentSize: %w", err) return fmt.Errorf("getSegmentSize: %w", err)
} }
sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index))
if err != nil { if err != nil {
return fmt.Errorf("unable to open segment: %w", err) return fmt.Errorf("unable to open segment: %w", err)
} }
@ -753,7 +697,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
} }
if r.Offset() != size { if r.Offset() != size {
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset()) return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset())
} }
} }