@ -290,8 +290,8 @@ func (w *Watcher) Run() error {
for ! isClosed ( w . quit ) {
for ! isClosed ( w . quit ) {
w . currentSegmentMetric . Set ( float64 ( currentSegment ) )
w . currentSegmentMetric . Set ( float64 ( currentSegment ) )
// Re set the value of lastSegment each iteration, this is to avoid having to wait too long for
// Re -check on each iteration in case a new segment was added,
// be tween reads if we're reading a segment that is not the most recent segment after startup .
// be cause watch() will wait for notifications on the last segment .
_ , lastSegment , err := w . firstAndLast ( )
_ , lastSegment , err := w . firstAndLast ( )
if err != nil {
if err != nil {
return fmt . Errorf ( "wal.Segments: %w" , err )
return fmt . Errorf ( "wal.Segments: %w" , err )
@ -299,8 +299,6 @@ func (w *Watcher) Run() error {
tail := currentSegment >= lastSegment
tail := currentSegment >= lastSegment
level . Debug ( w . logger ) . Log ( "msg" , "Processing segment" , "currentSegment" , currentSegment , "lastSegment" , lastSegment )
level . Debug ( w . logger ) . Log ( "msg" , "Processing segment" , "currentSegment" , currentSegment , "lastSegment" , lastSegment )
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w . watch ( currentSegment , tail ) ; err != nil && ! errors . Is ( err , ErrIgnorable ) {
if err := w . watch ( currentSegment , tail ) ; err != nil && ! errors . Is ( err , ErrIgnorable ) {
return err
return err
}
}