@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
@ -111,18 +112,17 @@ type WALWatcher struct {
}
// NewWALWatcher creates a new WAL watcher for a given WriteTo.
func NewWALWatcher ( logger log . Logger , name string , writer writeTo , walDir string , startTime int64 ) * WALWatcher {
func NewWALWatcher ( logger log . Logger , name string , writer writeTo , walDir string ) * WALWatcher {
if logger == nil {
logger = log . NewNopLogger ( )
}
return & WALWatcher {
logger : logger ,
writer : writer ,
walDir : path . Join ( walDir , "wal" ) ,
startTime : startTime ,
name : name ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
logger : logger ,
writer : writer ,
walDir : path . Join ( walDir , "wal" ) ,
name : name ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
recordsReadMetric : watcherRecordsRead . MustCurryWith ( prometheus . Labels { queue : name } ) ,
recordDecodeFailsMetric : watcherRecordDecodeFails . WithLabelValues ( name ) ,
@ -148,6 +148,7 @@ func (w *WALWatcher) loop() {
// We may encourter failures processing the WAL; we should wait and retry.
for ! isClosed ( w . quit ) {
w . startTime = timestamp . FromTime ( time . Now ( ) )
if err := w . run ( ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "error tailing WAL" , "err" , err )
}
@ -177,10 +178,9 @@ func (w *WALWatcher) run() error {
return err
}
level . Info ( w . logger ) . Log ( "msg" , "reading checkpoint" , "dir" , lastCheckpoint , "startFrom" , nextIndex )
if err == nil {
if err = w . readCheckpoint ( lastCheckpoint ) ; err != nil {
return err
return errors . Wrap ( err , "readCheckpoint" )
}
}
@ -189,20 +189,21 @@ func (w *WALWatcher) run() error {
return err
}
level . Info ( w . logger ) . Log ( "msg" , "starting from" , "currentSegment" , currentSegment , "last" , last )
for {
level . Info ( w . logger ) . Log ( "msg" , "tailing WAL" , "lastCheckpoint" , lastCheckpoint , "startFrom" , nextIndex , "currentSegment" , currentSegment , "last" , last )
for ! isClosed ( w . quit ) {
w . currentSegmentMetric . Set ( float64 ( currentSegment ) )
level . Info ( w . logger ) . Log ( "msg" , "process segment" , "segment" , currentSegment )
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w . watch ( nw , currentSegment , currentSegment >= last ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "runWatcher is ending" , "err" , err )
return err
}
currentSegment ++
}
return nil
}
// findSegmentForIndex finds the first segment greater than or equal to index.
@ -246,7 +247,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
}
defer segment . Close ( )
reader := wal . NewLiveReader ( segment )
reader := wal . NewLiveReader ( w . logger , segment )
readTicker := time . NewTicker ( readPeriod )
defer readTicker . Stop ( )
@ -274,8 +275,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
for {
select {
case <- w . quit :
level . Info ( w . logger ) . Log ( "msg" , "quitting WAL watcher watch loop" )
return errors . New ( "quit channel" )
return nil
case <- checkpointTicker . C :
// Periodically check if there is a new checkpoint so we can garbage
@ -296,25 +296,40 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
continue
}
if err := w . readSegment ( reader , segmentNum ) ; err != nil {
// Ignore errors reading to end of segment, as we're going to move to
// next segment now.
level . Error ( w . logger ) . Log ( "msg" , "error reading to end of segment" , "err" , err )
err = w . readSegment ( reader , segmentNum )
// Ignore errors reading to end of segment whilst replaying the WAL.
if ! tail {
if err != nil && err != io . EOF {
level . Warn ( w . logger ) . Log ( "msg" , "ignoring error reading to end of segment, may have dropped data" , "err" , err )
} else if reader . Offset ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "expected to have read whole segment, may have dropped data" , "segment" , segmentNum , "read" , reader . Offset ( ) , "size" , size )
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != io . EOF {
return err
}
level . Info ( w . logger ) . Log ( "msg" , "a new segment exists, we should start reading it" , "current" , fmt . Sprintf ( "%08d" , segmentNum ) , "new" , fmt . Sprintf ( "%08d" , last ) )
return nil
case <- readTicker . C :
err := w . readSegment ( reader , segmentNum )
// If we're reading to completion, stop when we hit an EOF.
if err == io . EOF && ! tail {
level . Info ( w . logger ) . Log ( "msg" , "done replaying segment" , "segment" , segmentNum , "size" , size , "read" , reader . TotalRead ( ) )
err = w . readSegment ( reader , segmentNum )
// Ignore all errors reading to end of segment whilst replaying the WAL.
if ! tail {
if err != nil && err != io . EOF {
level . Warn ( w . logger ) . Log ( "msg" , "ignoring error reading to end of segment, may have dropped data" , "segment" , segmentNum , "err" , err )
} else if reader . Offset ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "expected to have read whole segment, may have dropped data" , "segment" , segmentNum , "read" , reader . Offset ( ) , "size" , size )
}
return nil
}
if err != nil && err != io . EOF {
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != io . EOF {
return err
}
}
@ -355,11 +370,8 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error {
func ( w * WALWatcher ) readSegment ( r * wal . LiveReader , segmentNum int ) error {
for r . Next ( ) && ! isClosed ( w . quit ) {
err := w . decodeRecord ( r . Record ( ) , segmentNum )
// Intentionally skip over record decode errors.
if err != nil {
level . Error ( w . logger ) . Log ( "err" , err )
if err := w . decodeRecord ( r . Record ( ) , segmentNum ) ; err != nil {
return err
}
}
return r . Err ( )
@ -450,13 +462,13 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
}
// w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
r := wal . NewLiveReader ( sr )
if err := w . readSegment ( r , index ) ; err != nil {
r := wal . NewLiveReader ( w . logger , sr )
if err := w . readSegment ( r , index ) ; err != io . EOF {
return errors . Wrap ( err , "readSegment" )
}
if r . TotalRead ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "may not have read all data from checkpoint" , "totalRead" , r . TotalRead ( ) , "size" , size )
if r . Offset ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "may not have read all data from checkpoint" , "totalRead" , r . Offset ( ) , "size" , size )
}
level . Debug ( w . logger ) . Log ( "msg" , "read series references from checkpoint" , "checkpoint" , checkpointDir )