prometheus/tsdb/wlog/watcher.go

784 lines
23 KiB
Go

// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wlog
import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/record"
)
const (
checkpointPeriod = 5 * time.Second
segmentCheckPeriod = 100 * time.Millisecond
consumer = "consumer"
)
var (
ErrIgnorable = errors.New("ignore me")
readTimeout = 15 * time.Second
)
// WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. Functions will be called concurrently
// and it is left to the implementer to make sure they are safe.
type WriteTo interface {
// Append and AppendExemplar should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to it's final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
Append([]record.RefSample) bool
AppendExemplars([]record.RefExemplar) bool
AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
StoreSeries([]record.RefSeries, int)
// Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series
UpdateSeriesSegment([]record.RefSeries, int)
// Then SeriesReset is called to allow the deletion
// of all series created in a segment lower than the argument.
SeriesReset(int)
}
// Used to notify the watcher that data has been written so that it can read.
type WriteNotified interface {
Notify()
}
type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
samplesSentPreTailing *prometheus.CounterVec
currentSegment *prometheus.GaugeVec
notificationsSkipped *prometheus.CounterVec
}
// Watcher watches the TSDB WAL for a given WriteTo.
type Watcher struct {
name string
writer WriteTo
logger log.Logger
walDir string
lastCheckpoint string
sendExemplars bool
sendHistograms bool
metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics
startTime time.Time
startTimestamp int64 // the start time as a Prometheus timestamp
sendSamples bool
recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter
samplesSentPreTailing prometheus.Counter
currentSegmentMetric prometheus.Gauge
notificationsSkipped prometheus.Counter
readNotify chan struct{}
quit chan struct{}
done chan struct{}
// For testing, stop when we hit this segment.
MaxSegment int
}
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
m := &WatcherMetrics{
recordsRead: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "records_read_total",
Help: "Number of records read by the WAL watcher from the WAL.",
},
[]string{consumer, "type"},
),
recordDecodeFails: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "record_decode_failures_total",
Help: "Number of records read by the WAL watcher that resulted in an error when decoding.",
},
[]string{consumer},
),
samplesSentPreTailing: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "samples_sent_pre_tailing_total",
Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
},
[]string{consumer},
),
currentSegment: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "current_segment",
Help: "Current segment the WAL watcher is reading records from.",
},
[]string{consumer},
),
notificationsSkipped: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "notifications_skipped_total",
Help: "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine.",
},
[]string{consumer},
),
}
if reg != nil {
reg.MustRegister(m.recordsRead)
reg.MustRegister(m.recordDecodeFails)
reg.MustRegister(m.samplesSentPreTailing)
reg.MustRegister(m.currentSegment)
reg.MustRegister(m.notificationsSkipped)
}
return m
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher {
if logger == nil {
logger = log.NewNopLogger()
}
return &Watcher{
logger: logger,
writer: writer,
metrics: metrics,
readerMetrics: readerMetrics,
walDir: filepath.Join(dir, "wal"),
name: name,
sendExemplars: sendExemplars,
sendHistograms: sendHistograms,
readNotify: make(chan struct{}),
quit: make(chan struct{}),
done: make(chan struct{}),
MaxSegment: -1,
}
}
func (w *Watcher) Notify() {
select {
case w.readNotify <- struct{}{}:
return
default: // default so we can exit
// we don't need a buffered channel or any buffering since
// for each notification it recv's the watcher will read until EOF
w.notificationsSkipped.Inc()
}
}
func (w *Watcher) setMetrics() {
// Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's,
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
if w.metrics != nil {
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
}
}
// Start the Watcher.
func (w *Watcher) Start() {
w.setMetrics()
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
go w.loop()
}
// Stop the Watcher.
func (w *Watcher) Stop() {
close(w.quit)
<-w.done
// Records read metric has series and samples.
if w.metrics != nil {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
}
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
}
func (w *Watcher) loop() {
defer close(w.done)
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
w.SetStartTime(time.Now())
if err := w.Run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
}
}
}
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func (w *Watcher) Run() error {
// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
w.sendSamples = false
level.Info(w.logger).Log("msg", "Replaying WAL", "queue", w.name)
// Backfill from the checkpoint first if it exists.
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
}
if err == nil {
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
return fmt.Errorf("readCheckpoint: %w", err)
}
}
w.lastCheckpoint = lastCheckpoint
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
if err != nil {
return err
}
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment)
for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))
// Re-check on each iteration in case a new segment was added,
// because watch() will wait for notifications on the last segment.
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
return err
}
// For testing: stop when you hit a specific segment.
if currentSegment == w.MaxSegment {
return nil
}
currentSegment++
}
return nil
}
// findSegmentForIndex finds the first segment greater than or equal to index.
func (w *Watcher) findSegmentForIndex(index int) (int, error) {
refs, err := w.segments(w.walDir)
if err != nil {
return -1, err
}
for _, r := range refs {
if r >= index {
return r, nil
}
}
return -1, errors.New("failed to find segment for index")
}
func (w *Watcher) firstAndLast() (int, int, error) {
refs, err := w.segments(w.walDir)
if err != nil {
return -1, -1, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0], refs[len(refs)-1], nil
}
// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
func (w *Watcher) segments(dir string) ([]int, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var refs []int
for _, f := range files {
k, err := strconv.Atoi(f.Name())
if err != nil {
continue
}
refs = append(refs, k)
}
slices.Sort(refs)
for i := 0; i < len(refs)-1; i++ {
if refs[i]+1 != refs[i+1] {
return nil, errors.New("segments are not sequential")
}
}
return refs, nil
}
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
if err != nil && !errors.Is(err, io.EOF) {
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return ErrIgnorable
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
}
// Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *Watcher) watch(segmentNum int, tail bool) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
if err != nil {
return err
}
defer segment.Close()
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
size := int64(math.MaxInt64)
if !tail {
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
return w.readAndHandleError(reader, segmentNum, tail, size)
}
checkpointTicker := time.NewTicker(checkpointPeriod)
defer checkpointTicker.Stop()
segmentTicker := time.NewTicker(segmentCheckPeriod)
defer segmentTicker.Stop()
readTicker := time.NewTicker(readTimeout)
defer readTicker.Stop()
gcSem := make(chan struct{}, 1)
for {
select {
case <-w.quit:
return nil
case <-checkpointTicker.C:
// Periodically check if there is a new checkpoint so we can garbage
// collect labels. As this is considered an optimisation, we ignore
// errors during checkpoint processing. Doing the process asynchronously
// allows the current WAL segment to be processed while reading the
// checkpoint.
select {
case gcSem <- struct{}{}:
go func() {
defer func() {
<-gcSem
}()
if err := w.garbageCollectSeries(segmentNum); err != nil {
level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
}
}()
default:
// Currently doing a garbage collect, try again later.
}
case <-segmentTicker.C:
_, last, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("segments: %w", err)
}
// Check if new segments exists.
if last <= segmentNum {
continue
}
err = w.readSegment(reader, segmentNum, tail)
// Ignore errors reading to end of segment whilst replaying the WAL.
if !tail {
switch {
case err != nil && !errors.Is(err, io.EOF):
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
case reader.Offset() != size:
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
return err
}
// still want to reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
return err
}
// still want to reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
}
}
}
func (w *Watcher) garbageCollectSeries(segmentNum int) error {
dir, _, err := LastCheckpoint(w.walDir)
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
}
if dir == "" || dir == w.lastCheckpoint {
return nil
}
w.lastCheckpoint = dir
index, err := checkpointNum(dir)
if err != nil {
return fmt.Errorf("error parsing checkpoint filename: %w", err)
}
if index >= segmentNum {
level.Debug(w.logger).Log("msg", "Current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
return nil
}
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil {
return fmt.Errorf("readCheckpoint: %w", err)
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w.writer.SeriesReset(index)
return nil
}
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var (
dec record.Decoder
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {
break
}
samples, err := dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, s := range samples {
if s.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
samplesToSend = append(samplesToSend, s)
}
}
if len(samplesToSend) > 0 {
w.writer.Append(samplesToSend)
samplesToSend = samplesToSend[:0]
}
case record.Exemplars:
// Skip if experimental "exemplars over remote write" is not enabled.
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.AppendExemplars(exemplars)
case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
histograms, err := dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, h := range histograms {
if h.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
histogramsToSend = append(histogramsToSend, h)
}
}
if len(histogramsToSend) > 0 {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, fh := range floatHistograms {
if fh.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
floatHistogramsToSend = append(floatHistogramsToSend, fh)
}
}
if len(floatHistogramsToSend) > 0 {
w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0]
}
case record.Tombstones:
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
}
}
if err := r.Err(); err != nil {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
}
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
var (
dec record.Decoder
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.UpdateSeriesSegment(series, segmentNum)
// Ignore these; we're only interested in series.
case record.Samples:
case record.Exemplars:
case record.Tombstones:
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
}
}
if err := r.Err(); err != nil {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
}
func (w *Watcher) SetStartTime(t time.Time) {
w.startTime = t
w.startTimestamp = timestamp.FromTime(t)
}
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
index, err := checkpointNum(checkpointDir)
if err != nil {
return fmt.Errorf("checkpointNum: %w", err)
}
// Ensure we read the whole contents of every segment in the checkpoint dir.
segs, err := w.segments(checkpointDir)
if err != nil {
return fmt.Errorf("Unable to get segments checkpoint dir: %w", err)
}
for _, seg := range segs {
size, err := getSegmentSize(checkpointDir, seg)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
if err != nil {
return fmt.Errorf("unable to open segment: %w", err)
}
defer sr.Close()
r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := readFn(w, r, index, false); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("readSegment: %w", err)
}
if r.Offset() != size {
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset())
}
}
level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
return nil
}
func checkpointNum(dir string) (int, error) {
// Checkpoint dir names are in the format checkpoint.000001
// dir may contain a hidden directory, so only check the base directory
chunks := strings.Split(filepath.Base(dir), ".")
if len(chunks) != 2 {
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
}
result, err := strconv.Atoi(chunks[1])
if err != nil {
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
}
return result, nil
}
// Get size of segment.
func getSegmentSize(dir string, index int) (int64, error) {
i := int64(-1)
fi, err := os.Stat(SegmentName(dir, index))
if err == nil {
i = fi.Size()
}
return i, err
}
func isClosed(c chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}