2018-09-07 21:26:04 +00:00
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2022-10-10 15:08:46 +00:00
package wlog
2018-09-07 21:26:04 +00:00
import (
2023-12-04 17:08:43 +00:00
"errors"
2018-09-07 21:26:04 +00:00
"fmt"
2019-01-18 20:31:36 +00:00
"io"
"math"
2018-09-07 21:26:04 +00:00
"os"
2023-05-10 16:38:02 +00:00
"path/filepath"
2018-09-07 21:26:04 +00:00
"strconv"
"strings"
"time"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2018-09-07 21:26:04 +00:00
"github.com/prometheus/client_golang/prometheus"
2022-09-30 14:33:56 +00:00
"golang.org/x/exp/slices"
2020-10-22 09:00:08 +00:00
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/timestamp"
2019-09-19 09:15:41 +00:00
"github.com/prometheus/prometheus/tsdb/record"
2018-09-07 21:26:04 +00:00
)
2019-01-18 12:48:16 +00:00
const (
checkpointPeriod = 5 * time . Second
segmentCheckPeriod = 100 * time . Millisecond
2019-09-19 09:15:41 +00:00
consumer = "consumer"
2019-01-18 12:48:16 +00:00
)
2023-05-15 19:31:49 +00:00
var (
ErrIgnorable = errors . New ( "ignore me" )
readTimeout = 15 * time . Second
)
2019-09-19 09:15:41 +00:00
// WriteTo is an interface used by the Watcher to send the samples it's read
2019-09-13 17:23:58 +00:00
// from the WAL on to somewhere else. Functions will be called concurrently
// and it is left to the implementer to make sure they are safe.
2019-09-19 09:15:41 +00:00
type WriteTo interface {
2022-01-31 06:14:16 +00:00
// Append and AppendExemplar should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to it's final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
2019-09-19 09:15:41 +00:00
Append ( [ ] record . RefSample ) bool
2021-05-06 20:53:52 +00:00
AppendExemplars ( [ ] record . RefExemplar ) bool
2022-08-29 12:08:36 +00:00
AppendHistograms ( [ ] record . RefHistogramSample ) bool
2023-01-13 11:09:20 +00:00
AppendFloatHistograms ( [ ] record . RefFloatHistogramSample ) bool
2019-09-19 09:15:41 +00:00
StoreSeries ( [ ] record . RefSeries , int )
2022-01-31 06:14:16 +00:00
2021-07-27 20:21:48 +00:00
// Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series
UpdateSeriesSegment ( [ ] record . RefSeries , int )
// Then SeriesReset is called to allow the deletion
2019-09-13 17:23:58 +00:00
// of all series created in a segment lower than the argument.
2019-09-19 09:15:41 +00:00
SeriesReset ( int )
2018-09-07 21:26:04 +00:00
}
2023-12-01 22:26:38 +00:00
// Used to notify the watcher that data has been written so that it can read.
2023-05-15 19:31:49 +00:00
type WriteNotified interface {
Notify ( )
}
2019-09-19 09:15:41 +00:00
type WatcherMetrics struct {
recordsRead * prometheus . CounterVec
recordDecodeFails * prometheus . CounterVec
samplesSentPreTailing * prometheus . CounterVec
currentSegment * prometheus . GaugeVec
2023-05-15 19:31:49 +00:00
notificationsSkipped * prometheus . CounterVec
2018-09-07 21:26:04 +00:00
}
2019-09-19 09:15:41 +00:00
// Watcher watches the TSDB WAL for a given WriteTo.
type Watcher struct {
2019-02-19 06:46:52 +00:00
name string
2019-09-19 09:15:41 +00:00
writer WriteTo
2019-02-19 06:46:52 +00:00
logger log . Logger
walDir string
lastCheckpoint string
2021-05-06 20:53:52 +00:00
sendExemplars bool
2022-07-14 13:13:12 +00:00
sendHistograms bool
2019-09-19 09:15:41 +00:00
metrics * WatcherMetrics
2020-03-20 16:34:15 +00:00
readerMetrics * LiveReaderMetrics
2019-01-18 12:48:16 +00:00
2019-11-27 00:53:11 +00:00
startTime time . Time
startTimestamp int64 // the start time as a Prometheus timestamp
sendSamples bool
2018-09-07 21:26:04 +00:00
2019-02-13 19:14:15 +00:00
recordsReadMetric * prometheus . CounterVec
2018-09-07 21:26:04 +00:00
recordDecodeFailsMetric prometheus . Counter
samplesSentPreTailing prometheus . Counter
currentSegmentMetric prometheus . Gauge
2023-05-15 19:31:49 +00:00
notificationsSkipped prometheus . Counter
2018-09-07 21:26:04 +00:00
2023-05-15 19:31:49 +00:00
readNotify chan struct { }
quit chan struct { }
done chan struct { }
2019-02-20 04:03:41 +00:00
// For testing, stop when we hit this segment.
2019-09-19 09:15:41 +00:00
MaxSegment int
2018-09-07 21:26:04 +00:00
}
2019-09-19 09:15:41 +00:00
func NewWatcherMetrics ( reg prometheus . Registerer ) * WatcherMetrics {
m := & WatcherMetrics {
recordsRead : prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "records_read_total" ,
Help : "Number of records read by the WAL watcher from the WAL." ,
} ,
[ ] string { consumer , "type" } ,
) ,
recordDecodeFails : prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "record_decode_failures_total" ,
Help : "Number of records read by the WAL watcher that resulted in an error when decoding." ,
} ,
[ ] string { consumer } ,
) ,
samplesSentPreTailing : prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "samples_sent_pre_tailing_total" ,
Help : "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL." ,
} ,
[ ] string { consumer } ,
) ,
currentSegment : prometheus . NewGaugeVec (
prometheus . GaugeOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "current_segment" ,
Help : "Current segment the WAL watcher is reading records from." ,
} ,
[ ] string { consumer } ,
) ,
2023-05-15 19:31:49 +00:00
notificationsSkipped : prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "notifications_skipped_total" ,
Help : "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine." ,
} ,
[ ] string { consumer } ,
) ,
2019-09-19 09:15:41 +00:00
}
if reg != nil {
2020-03-20 16:34:15 +00:00
reg . MustRegister ( m . recordsRead )
reg . MustRegister ( m . recordDecodeFails )
reg . MustRegister ( m . samplesSentPreTailing )
reg . MustRegister ( m . currentSegment )
2023-05-15 19:31:49 +00:00
reg . MustRegister ( m . notificationsSkipped )
2019-09-19 09:15:41 +00:00
}
return m
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
2022-07-14 13:13:12 +00:00
func NewWatcher ( metrics * WatcherMetrics , readerMetrics * LiveReaderMetrics , logger log . Logger , name string , writer WriteTo , dir string , sendExemplars , sendHistograms bool ) * Watcher {
2018-09-07 21:26:04 +00:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2019-09-19 09:15:41 +00:00
return & Watcher {
2022-07-14 13:13:12 +00:00
logger : logger ,
writer : writer ,
metrics : metrics ,
readerMetrics : readerMetrics ,
2023-05-10 16:38:02 +00:00
walDir : filepath . Join ( dir , "wal" ) ,
2022-07-14 13:13:12 +00:00
name : name ,
sendExemplars : sendExemplars ,
sendHistograms : sendHistograms ,
2021-05-06 20:53:52 +00:00
2023-05-15 19:31:49 +00:00
readNotify : make ( chan struct { } ) ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
2019-09-19 09:15:41 +00:00
MaxSegment : - 1 ,
2019-02-13 19:14:15 +00:00
}
2018-09-07 21:26:04 +00:00
}
2023-05-15 19:31:49 +00:00
func ( w * Watcher ) Notify ( ) {
select {
case w . readNotify <- struct { } { } :
return
default : // default so we can exit
// we don't need a buffered channel or any buffering since
// for each notification it recv's the watcher will read until EOF
w . notificationsSkipped . Inc ( )
}
}
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) setMetrics ( ) {
2019-04-23 08:49:17 +00:00
// Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's,
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
2019-09-19 09:15:41 +00:00
if w . metrics != nil {
w . recordsReadMetric = w . metrics . recordsRead . MustCurryWith ( prometheus . Labels { consumer : w . name } )
w . recordDecodeFailsMetric = w . metrics . recordDecodeFails . WithLabelValues ( w . name )
w . samplesSentPreTailing = w . metrics . samplesSentPreTailing . WithLabelValues ( w . name )
w . currentSegmentMetric = w . metrics . currentSegment . WithLabelValues ( w . name )
2023-05-15 19:31:49 +00:00
w . notificationsSkipped = w . metrics . notificationsSkipped . WithLabelValues ( w . name )
2019-09-19 09:15:41 +00:00
}
2019-04-23 08:49:17 +00:00
}
2019-09-19 09:15:41 +00:00
// Start the Watcher.
func ( w * Watcher ) Start ( ) {
2019-04-23 08:49:17 +00:00
w . setMetrics ( )
2020-04-11 08:22:18 +00:00
level . Info ( w . logger ) . Log ( "msg" , "Starting WAL watcher" , "queue" , w . name )
2019-04-23 08:49:17 +00:00
2019-02-13 14:47:35 +00:00
go w . loop ( )
2018-09-07 21:26:04 +00:00
}
2019-09-19 09:15:41 +00:00
// Stop the Watcher.
func ( w * Watcher ) Stop ( ) {
2018-09-07 21:26:04 +00:00
close ( w . quit )
2019-02-13 17:06:03 +00:00
<- w . done
2019-04-23 08:49:17 +00:00
// Records read metric has series and samples.
2020-03-20 16:34:15 +00:00
if w . metrics != nil {
w . metrics . recordsRead . DeleteLabelValues ( w . name , "series" )
w . metrics . recordsRead . DeleteLabelValues ( w . name , "samples" )
w . metrics . recordDecodeFails . DeleteLabelValues ( w . name )
w . metrics . samplesSentPreTailing . DeleteLabelValues ( w . name )
w . metrics . currentSegment . DeleteLabelValues ( w . name )
}
2019-04-23 08:49:17 +00:00
2019-02-13 17:06:03 +00:00
level . Info ( w . logger ) . Log ( "msg" , "WAL watcher stopped" , "queue" , w . name )
2018-09-07 21:26:04 +00:00
}
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) loop ( ) {
2019-02-13 17:06:03 +00:00
defer close ( w . done )
2019-02-13 14:47:35 +00:00
2019-07-29 09:45:27 +00:00
// We may encounter failures processing the WAL; we should wait and retry.
2019-02-14 10:02:54 +00:00
for ! isClosed ( w . quit ) {
2019-11-27 00:53:11 +00:00
w . SetStartTime ( time . Now ( ) )
2019-09-19 09:15:41 +00:00
if err := w . Run ( ) ; err != nil {
2019-02-13 14:47:35 +00:00
level . Error ( w . logger ) . Log ( "msg" , "error tailing WAL" , "err" , err )
2018-09-07 21:26:04 +00:00
}
2019-02-13 14:47:35 +00:00
select {
case <- w . quit :
return
case <- time . After ( 5 * time . Second ) :
}
2018-09-07 21:26:04 +00:00
}
2019-02-13 14:47:35 +00:00
}
2018-09-07 21:26:04 +00:00
2019-09-19 09:15:41 +00:00
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func ( w * Watcher ) Run ( ) error {
2019-03-05 12:21:11 +00:00
_ , lastSegment , err := w . firstAndLast ( )
2019-02-13 17:06:03 +00:00
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "wal.Segments: %w" , err )
2019-02-13 17:06:03 +00:00
}
2019-11-27 00:53:11 +00:00
// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
w . sendSamples = false
2019-11-19 05:11:04 +00:00
2020-04-11 08:22:18 +00:00
level . Info ( w . logger ) . Log ( "msg" , "Replaying WAL" , "queue" , w . name )
2019-02-13 17:06:03 +00:00
2019-01-18 20:31:36 +00:00
// Backfill from the checkpoint first if it exists.
2019-09-19 09:15:41 +00:00
lastCheckpoint , checkpointIndex , err := LastCheckpoint ( w . walDir )
2023-12-04 17:08:43 +00:00
if err != nil && ! errors . Is ( err , record . ErrNotFound ) {
return fmt . Errorf ( "tsdb.LastCheckpoint: %w" , err )
2019-01-18 20:31:36 +00:00
}
if err == nil {
2021-07-27 20:21:48 +00:00
if err = w . readCheckpoint ( lastCheckpoint , ( * Watcher ) . readSegment ) ; err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "readCheckpoint: %w" , err )
2019-01-18 20:31:36 +00:00
}
}
2019-02-19 06:46:52 +00:00
w . lastCheckpoint = lastCheckpoint
2019-01-18 20:31:36 +00:00
2019-02-20 07:51:08 +00:00
currentSegment , err := w . findSegmentForIndex ( checkpointIndex )
2019-02-13 14:47:35 +00:00
if err != nil {
return err
}
2019-01-18 12:48:16 +00:00
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Tailing WAL" , "lastCheckpoint" , lastCheckpoint , "checkpointIndex" , checkpointIndex , "currentSegment" , currentSegment , "lastSegment" , lastSegment )
2019-02-15 09:54:01 +00:00
for ! isClosed ( w . quit ) {
2019-02-13 17:06:03 +00:00
w . currentSegmentMetric . Set ( float64 ( currentSegment ) )
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Processing segment" , "currentSegment" , currentSegment )
2019-01-18 12:48:16 +00:00
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
2023-05-15 19:31:49 +00:00
if err := w . watch ( currentSegment , currentSegment >= lastSegment ) ; err != nil && ! errors . Is ( err , ErrIgnorable ) {
2019-02-13 14:47:35 +00:00
return err
2019-01-18 12:48:16 +00:00
}
2019-02-20 07:51:08 +00:00
// For testing: stop when you hit a specific segment.
2019-09-19 09:15:41 +00:00
if currentSegment == w . MaxSegment {
2019-02-20 04:03:41 +00:00
return nil
}
2019-02-13 17:06:03 +00:00
currentSegment ++
2019-01-18 12:48:16 +00:00
}
2019-02-15 09:54:01 +00:00
return nil
2018-09-07 21:26:04 +00:00
}
2019-02-14 10:02:54 +00:00
// findSegmentForIndex finds the first segment greater than or equal to index.
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) findSegmentForIndex ( index int ) ( int , error ) {
2019-04-09 09:52:44 +00:00
refs , err := w . segments ( w . walDir )
2019-03-05 12:21:11 +00:00
if err != nil {
2019-07-15 16:52:03 +00:00
return - 1 , err
2019-03-05 12:21:11 +00:00
}
for _ , r := range refs {
if r >= index {
return r , nil
}
}
return - 1 , errors . New ( "failed to find segment for index" )
}
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) firstAndLast ( ) ( int , int , error ) {
2019-04-09 09:52:44 +00:00
refs , err := w . segments ( w . walDir )
2019-03-05 12:21:11 +00:00
if err != nil {
2019-07-15 16:52:03 +00:00
return - 1 , - 1 , err
2019-03-05 12:21:11 +00:00
}
if len ( refs ) == 0 {
return - 1 , - 1 , nil
}
return refs [ 0 ] , refs [ len ( refs ) - 1 ] , nil
}
2022-10-10 15:08:46 +00:00
// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
2019-03-05 12:21:11 +00:00
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) segments ( dir string ) ( [ ] int , error ) {
2022-04-27 09:24:36 +00:00
files , err := os . ReadDir ( dir )
2019-02-13 14:47:35 +00:00
if err != nil {
2019-03-05 12:21:11 +00:00
return nil , err
2019-02-13 14:47:35 +00:00
}
var refs [ ] int
2020-04-06 13:34:20 +00:00
for _ , f := range files {
k , err := strconv . Atoi ( f . Name ( ) )
2019-02-13 14:47:35 +00:00
if err != nil {
continue
}
refs = append ( refs , k )
}
2022-09-30 14:33:56 +00:00
slices . Sort ( refs )
2021-05-25 10:08:35 +00:00
for i := 0 ; i < len ( refs ) - 1 ; i ++ {
if refs [ i ] + 1 != refs [ i + 1 ] {
return nil , errors . New ( "segments are not sequential" )
}
}
2019-03-05 12:21:11 +00:00
return refs , nil
2019-02-13 14:47:35 +00:00
}
2023-05-15 19:31:49 +00:00
func ( w * Watcher ) readAndHandleError ( r * LiveReader , segmentNum int , tail bool , size int64 ) error {
err := w . readSegment ( r , segmentNum , tail )
// Ignore all errors reading to end of segment whilst replaying the WAL.
if ! tail {
2023-12-04 17:08:43 +00:00
if err != nil && ! errors . Is ( err , io . EOF ) {
2023-05-15 19:31:49 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Ignoring error reading to end of segment, may have dropped data" , "segment" , segmentNum , "err" , err )
} else if r . Offset ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "Expected to have read whole segment, may have dropped data" , "segment" , segmentNum , "read" , r . Offset ( ) , "size" , size )
}
return ErrIgnorable
}
// Otherwise, when we are tailing, non-EOFs are fatal.
2023-12-04 17:08:43 +00:00
if err != nil && ! errors . Is ( err , io . EOF ) {
2023-05-15 19:31:49 +00:00
return err
}
return nil
}
2019-02-20 07:51:08 +00:00
// Use tail true to indicate that the reader is currently on a segment that is
2019-01-18 20:31:36 +00:00
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) watch ( segmentNum int , tail bool ) error {
segment , err := OpenReadSegment ( SegmentName ( w . walDir , segmentNum ) )
2019-02-12 14:12:37 +00:00
if err != nil {
return err
}
defer segment . Close ( )
2019-09-19 09:15:41 +00:00
reader := NewLiveReader ( w . logger , w . readerMetrics , segment )
2019-01-18 12:48:16 +00:00
2019-01-18 20:31:36 +00:00
size := int64 ( math . MaxInt64 )
if ! tail {
var err error
2019-02-13 17:06:03 +00:00
size , err = getSegmentSize ( w . walDir , segmentNum )
2019-01-18 20:31:36 +00:00
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "getSegmentSize: %w" , err )
2019-01-18 20:31:36 +00:00
}
2023-12-01 22:26:38 +00:00
return w . readAndHandleError ( reader , segmentNum , tail , size )
2019-01-18 20:31:36 +00:00
}
2018-09-07 21:26:04 +00:00
2023-12-01 22:26:38 +00:00
checkpointTicker := time . NewTicker ( checkpointPeriod )
defer checkpointTicker . Stop ( )
segmentTicker := time . NewTicker ( segmentCheckPeriod )
defer segmentTicker . Stop ( )
readTicker := time . NewTicker ( readTimeout )
defer readTicker . Stop ( )
2019-09-13 17:23:58 +00:00
gcSem := make ( chan struct { } , 1 )
2018-09-07 21:26:04 +00:00
for {
select {
case <- w . quit :
2019-02-15 09:54:01 +00:00
return nil
2019-01-18 12:48:16 +00:00
2018-09-07 21:26:04 +00:00
case <- checkpointTicker . C :
2019-02-13 19:14:15 +00:00
// Periodically check if there is a new checkpoint so we can garbage
// collect labels. As this is considered an optimisation, we ignore
2019-09-13 17:23:58 +00:00
// errors during checkpoint processing. Doing the process asynchronously
// allows the current WAL segment to be processed while reading the
// checkpoint.
select {
case gcSem <- struct { } { } :
go func ( ) {
defer func ( ) {
<- gcSem
} ( )
if err := w . garbageCollectSeries ( segmentNum ) ; err != nil {
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Error process checkpoint" , "err" , err )
2019-09-13 17:23:58 +00:00
}
} ( )
default :
// Currently doing a garbage collect, try again later.
2018-09-07 21:26:04 +00:00
}
2019-01-18 12:48:16 +00:00
2018-09-07 21:26:04 +00:00
case <- segmentTicker . C :
2019-03-05 12:21:11 +00:00
_ , last , err := w . firstAndLast ( )
2018-09-07 21:26:04 +00:00
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "segments: %w" , err )
2019-01-18 12:48:16 +00:00
}
2019-01-18 20:31:36 +00:00
// Check if new segments exists.
2019-02-13 17:06:03 +00:00
if last <= segmentNum {
2018-09-07 21:26:04 +00:00
continue
}
2019-02-20 04:03:41 +00:00
err = w . readSegment ( reader , segmentNum , tail )
2019-02-15 09:54:01 +00:00
// Ignore errors reading to end of segment whilst replaying the WAL.
if ! tail {
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2023-04-12 14:14:31 +00:00
switch {
2023-12-04 17:08:43 +00:00
case err != nil && ! errors . Is ( err , io . EOF ) :
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Ignoring error reading to end of segment, may have dropped data" , "err" , err )
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2023-04-12 14:14:31 +00:00
case reader . Offset ( ) != size :
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Expected to have read whole segment, may have dropped data" , "segment" , segmentNum , "read" , reader . Offset ( ) , "size" , size )
2019-02-15 09:54:01 +00:00
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
2023-12-04 17:08:43 +00:00
if err != nil && ! errors . Is ( err , io . EOF ) {
2019-02-15 09:54:01 +00:00
return err
2018-09-07 21:26:04 +00:00
}
2019-01-18 12:48:16 +00:00
return nil
2023-05-15 19:31:49 +00:00
// we haven't read due to a notification in quite some time, try reading anyways
2018-09-07 21:26:04 +00:00
case <- readTicker . C :
2023-05-15 19:31:49 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Watcher is reading the WAL due to timeout, haven't received any write notifications recently" , "timeout" , readTimeout )
err := w . readAndHandleError ( reader , segmentNum , tail , size )
if err != nil {
return err
2019-01-18 20:31:36 +00:00
}
2023-05-15 19:31:49 +00:00
// still want to reset the ticker so we don't read too often
readTicker . Reset ( readTimeout )
2019-02-14 10:02:54 +00:00
2023-05-15 19:31:49 +00:00
case <- w . readNotify :
err := w . readAndHandleError ( reader , segmentNum , tail , size )
if err != nil {
2019-02-14 10:02:54 +00:00
return err
}
2023-05-15 19:31:49 +00:00
// still want to reset the ticker so we don't read too often
readTicker . Reset ( readTimeout )
2018-09-07 21:26:04 +00:00
}
}
}
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) garbageCollectSeries ( segmentNum int ) error {
dir , _ , err := LastCheckpoint ( w . walDir )
2023-12-04 17:08:43 +00:00
if err != nil && ! errors . Is ( err , record . ErrNotFound ) {
return fmt . Errorf ( "tsdb.LastCheckpoint: %w" , err )
2019-02-13 19:14:15 +00:00
}
2019-02-19 06:46:52 +00:00
if dir == "" || dir == w . lastCheckpoint {
2019-02-13 19:14:15 +00:00
return nil
}
2019-02-19 06:46:52 +00:00
w . lastCheckpoint = dir
2019-02-13 19:14:15 +00:00
index , err := checkpointNum ( dir )
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "error parsing checkpoint filename: %w" , err )
2019-02-13 19:14:15 +00:00
}
if index >= segmentNum {
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Current segment is behind the checkpoint, skipping reading of checkpoint" , "current" , fmt . Sprintf ( "%08d" , segmentNum ) , "checkpoint" , dir )
2019-02-13 19:14:15 +00:00
return nil
}
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "New checkpoint detected" , "new" , dir , "currentSegment" , segmentNum )
2019-02-13 19:14:15 +00:00
2021-07-27 20:21:48 +00:00
if err = w . readCheckpoint ( dir , ( * Watcher ) . readSegmentForGC ) ; err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "readCheckpoint: %w" , err )
2019-02-13 19:14:15 +00:00
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w . writer . SeriesReset ( index )
return nil
}
2021-07-27 20:21:48 +00:00
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
2019-09-19 09:15:41 +00:00
func ( w * Watcher ) readSegment ( r * LiveReader , segmentNum int , tail bool ) error {
2019-02-20 04:03:41 +00:00
var (
2023-01-13 11:09:20 +00:00
dec record . Decoder
series [ ] record . RefSeries
samples [ ] record . RefSample
samplesToSend [ ] record . RefSample
exemplars [ ] record . RefExemplar
histograms [ ] record . RefHistogramSample
histogramsToSend [ ] record . RefHistogramSample
floatHistograms [ ] record . RefFloatHistogramSample
floatHistogramsToSend [ ] record . RefFloatHistogramSample
2019-02-20 04:03:41 +00:00
)
2019-01-18 12:48:16 +00:00
for r . Next ( ) && ! isClosed ( w . quit ) {
2019-02-20 04:03:41 +00:00
rec := r . Record ( )
2022-07-18 10:24:11 +00:00
w . recordsReadMetric . WithLabelValues ( dec . Type ( rec ) . String ( ) ) . Inc ( )
2019-02-20 04:03:41 +00:00
switch dec . Type ( rec ) {
2019-09-19 09:15:41 +00:00
case record . Series :
2019-02-20 04:03:41 +00:00
series , err := dec . Series ( rec , series [ : 0 ] )
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
w . writer . StoreSeries ( series , segmentNum )
2019-09-19 09:15:41 +00:00
case record . Samples :
2019-02-20 04:03:41 +00:00
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if ! tail {
break
}
samples , err := dec . Samples ( rec , samples [ : 0 ] )
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
for _ , s := range samples {
2019-11-27 00:53:11 +00:00
if s . T > w . startTimestamp {
if ! w . sendSamples {
w . sendSamples = true
duration := time . Since ( w . startTime )
2020-04-11 08:22:18 +00:00
level . Info ( w . logger ) . Log ( "msg" , "Done replaying WAL" , "duration" , duration )
2019-11-19 05:11:04 +00:00
}
2022-07-14 13:13:12 +00:00
samplesToSend = append ( samplesToSend , s )
2019-02-20 04:03:41 +00:00
}
}
2022-07-14 13:13:12 +00:00
if len ( samplesToSend ) > 0 {
w . writer . Append ( samplesToSend )
samplesToSend = samplesToSend [ : 0 ]
2019-02-20 04:03:41 +00:00
}
2021-05-06 20:53:52 +00:00
case record . Exemplars :
// Skip if experimental "exemplars over remote write" is not enabled.
if ! w . sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if ! tail {
break
}
exemplars , err := dec . Exemplars ( rec , exemplars [ : 0 ] )
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
w . writer . AppendExemplars ( exemplars )
2022-08-29 12:08:36 +00:00
case record . HistogramSamples :
2022-07-14 13:13:12 +00:00
// Skip if experimental "histograms over remote write" is not enabled.
if ! w . sendHistograms {
break
}
if ! tail {
break
}
2022-08-29 12:08:36 +00:00
histograms , err := dec . HistogramSamples ( rec , histograms [ : 0 ] )
2022-07-14 13:13:12 +00:00
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
for _ , h := range histograms {
if h . T > w . startTimestamp {
if ! w . sendSamples {
w . sendSamples = true
duration := time . Since ( w . startTime )
level . Info ( w . logger ) . Log ( "msg" , "Done replaying WAL" , "duration" , duration )
}
histogramsToSend = append ( histogramsToSend , h )
}
}
if len ( histogramsToSend ) > 0 {
w . writer . AppendHistograms ( histogramsToSend )
histogramsToSend = histogramsToSend [ : 0 ]
}
2023-01-13 11:09:20 +00:00
case record . FloatHistogramSamples :
// Skip if experimental "histograms over remote write" is not enabled.
if ! w . sendHistograms {
break
}
if ! tail {
break
}
floatHistograms , err := dec . FloatHistogramSamples ( rec , floatHistograms [ : 0 ] )
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
for _ , fh := range floatHistograms {
if fh . T > w . startTimestamp {
if ! w . sendSamples {
w . sendSamples = true
duration := time . Since ( w . startTime )
level . Info ( w . logger ) . Log ( "msg" , "Done replaying WAL" , "duration" , duration )
}
floatHistogramsToSend = append ( floatHistogramsToSend , fh )
}
}
if len ( floatHistogramsToSend ) > 0 {
w . writer . AppendFloatHistograms ( floatHistogramsToSend )
floatHistogramsToSend = floatHistogramsToSend [ : 0 ]
}
2019-09-19 09:15:41 +00:00
case record . Tombstones :
2019-02-20 04:03:41 +00:00
default :
2020-10-05 09:09:59 +00:00
// Could be corruption, or reading from a WAL from a newer Prometheus.
2019-02-20 04:03:41 +00:00
w . recordDecodeFailsMetric . Inc ( )
2018-09-07 21:26:04 +00:00
}
}
2023-12-04 17:08:43 +00:00
if err := r . Err ( ) ; err != nil {
return fmt . Errorf ( "segment %d: %w" , segmentNum , err )
}
return nil
2019-01-18 12:48:16 +00:00
}
2018-09-07 21:26:04 +00:00
2021-07-27 20:21:48 +00:00
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func ( w * Watcher ) readSegmentForGC ( r * LiveReader , segmentNum int , _ bool ) error {
var (
dec record . Decoder
series [ ] record . RefSeries
)
for r . Next ( ) && ! isClosed ( w . quit ) {
rec := r . Record ( )
2022-07-18 10:24:11 +00:00
w . recordsReadMetric . WithLabelValues ( dec . Type ( rec ) . String ( ) ) . Inc ( )
2021-07-27 20:21:48 +00:00
switch dec . Type ( rec ) {
case record . Series :
series , err := dec . Series ( rec , series [ : 0 ] )
if err != nil {
w . recordDecodeFailsMetric . Inc ( )
return err
}
w . writer . UpdateSeriesSegment ( series , segmentNum )
// Ignore these; we're only interested in series.
case record . Samples :
case record . Exemplars :
case record . Tombstones :
default :
// Could be corruption, or reading from a WAL from a newer Prometheus.
w . recordDecodeFailsMetric . Inc ( )
}
}
2023-12-04 17:08:43 +00:00
if err := r . Err ( ) ; err != nil {
return fmt . Errorf ( "segment %d: %w" , segmentNum , err )
}
return nil
2021-07-27 20:21:48 +00:00
}
2019-11-27 00:53:11 +00:00
func ( w * Watcher ) SetStartTime ( t time . Time ) {
w . startTime = t
w . startTimestamp = timestamp . FromTime ( t )
}
2021-07-27 20:21:48 +00:00
type segmentReadFn func ( w * Watcher , r * LiveReader , segmentNum int , tail bool ) error
2019-01-18 12:48:16 +00:00
// Read all the series records from a Checkpoint directory.
2021-07-27 20:21:48 +00:00
func ( w * Watcher ) readCheckpoint ( checkpointDir string , readFn segmentReadFn ) error {
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Reading checkpoint" , "dir" , checkpointDir )
2019-02-13 17:06:03 +00:00
index , err := checkpointNum ( checkpointDir )
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "checkpointNum: %w" , err )
2019-02-13 17:06:03 +00:00
}
2019-04-09 09:52:44 +00:00
// Ensure we read the whole contents of every segment in the checkpoint dir.
segs , err := w . segments ( checkpointDir )
2018-09-07 21:26:04 +00:00
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "Unable to get segments checkpoint dir: %w" , err )
2018-09-07 21:26:04 +00:00
}
2019-04-09 09:52:44 +00:00
for _ , seg := range segs {
size , err := getSegmentSize ( checkpointDir , seg )
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "getSegmentSize: %w" , err )
2019-04-09 09:52:44 +00:00
}
2018-09-07 21:26:04 +00:00
2019-09-19 09:15:41 +00:00
sr , err := OpenReadSegment ( SegmentName ( checkpointDir , seg ) )
2019-04-09 09:52:44 +00:00
if err != nil {
2023-12-04 17:08:43 +00:00
return fmt . Errorf ( "unable to open segment: %w" , err )
2019-04-09 09:52:44 +00:00
}
defer sr . Close ( )
2018-09-07 21:26:04 +00:00
2019-09-19 09:15:41 +00:00
r := NewLiveReader ( w . logger , w . readerMetrics , sr )
2023-12-04 17:08:43 +00:00
if err := readFn ( w , r , index , false ) ; err != nil && ! errors . Is ( err , io . EOF ) {
return fmt . Errorf ( "readSegment: %w" , err )
2019-04-09 09:52:44 +00:00
}
2019-02-13 14:47:35 +00:00
2019-04-09 09:52:44 +00:00
if r . Offset ( ) != size {
return fmt . Errorf ( "readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d" , checkpointDir , seg , size , r . Offset ( ) )
}
2019-01-18 20:31:36 +00:00
}
2019-02-13 14:47:35 +00:00
2020-04-11 08:22:18 +00:00
level . Debug ( w . logger ) . Log ( "msg" , "Read series references from checkpoint" , "checkpoint" , checkpointDir )
2019-01-18 12:48:16 +00:00
return nil
2018-09-07 21:26:04 +00:00
}
2019-01-18 12:48:16 +00:00
func checkpointNum ( dir string ) ( int , error ) {
2018-09-07 21:26:04 +00:00
// Checkpoint dir names are in the format checkpoint.000001
2019-07-15 16:53:58 +00:00
// dir may contain a hidden directory, so only check the base directory
2023-05-10 16:38:02 +00:00
chunks := strings . Split ( filepath . Base ( dir ) , "." )
2018-09-07 21:26:04 +00:00
if len ( chunks ) != 2 {
2023-11-14 13:04:31 +00:00
return 0 , fmt . Errorf ( "invalid checkpoint dir string: %s" , dir )
2018-09-07 21:26:04 +00:00
}
2019-01-18 12:48:16 +00:00
result , err := strconv . Atoi ( chunks [ 1 ] )
if err != nil {
2023-11-14 13:04:31 +00:00
return 0 , fmt . Errorf ( "invalid checkpoint dir string: %s" , dir )
2019-01-18 12:48:16 +00:00
}
return result , nil
2018-09-07 21:26:04 +00:00
}
// Get size of segment.
func getSegmentSize ( dir string , index int ) ( int64 , error ) {
i := int64 ( - 1 )
2019-09-19 09:15:41 +00:00
fi , err := os . Stat ( SegmentName ( dir , index ) )
2018-09-07 21:26:04 +00:00
if err == nil {
i = fi . Size ( )
}
return i , err
}
func isClosed ( c chan struct { } ) bool {
select {
case <- c :
return true
default :
return false
}
}