@ -41,50 +41,14 @@ const (
)
)
var (
var (
watcher Samples RecordsRead = prometheus . NewCounterVec (
watcher RecordsRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
prometheus . CounterOpts {
Namespace : "prometheus" ,
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Subsystem : "wal_watcher" ,
Name : " samples_ records_read_total",
Name : " records_read_total",
Help : "Number of samples records read by the WAL watcher from the WAL.",
Help : "Number of records read by the WAL watcher from the WAL.",
} ,
} ,
[ ] string { queue } ,
[ ] string { queue , "type" } ,
)
watcherSeriesRecordsRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "series_records_read_total" ,
Help : "Number of series records read by the WAL watcher from the WAL." ,
} ,
[ ] string { queue } ,
)
watcherTombstoneRecordsRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "tombstone_records_read_total" ,
Help : "Number of tombstone records read by the WAL watcher from the WAL." ,
} ,
[ ] string { queue } ,
)
watcherInvalidRecordsRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "invalid_records_read_total" ,
Help : "Number of invalid records read by the WAL watcher from the WAL." ,
} ,
[ ] string { queue } ,
)
watcherUnknownTypeRecordsRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "prometheus" ,
Subsystem : "wal_watcher" ,
Name : "unknown_records_read_total" ,
Help : "Number of records read by the WAL watcher from the WAL of an unknown record type." ,
} ,
[ ] string { queue } ,
)
)
watcherRecordDecodeFails = prometheus . NewCounterVec (
watcherRecordDecodeFails = prometheus . NewCounterVec (
prometheus . CounterOpts {
prometheus . CounterOpts {
@ -116,11 +80,7 @@ var (
)
)
func init ( ) {
func init ( ) {
prometheus . MustRegister ( watcherSamplesRecordsRead )
prometheus . MustRegister ( watcherRecordsRead )
prometheus . MustRegister ( watcherSeriesRecordsRead )
prometheus . MustRegister ( watcherTombstoneRecordsRead )
prometheus . MustRegister ( watcherInvalidRecordsRead )
prometheus . MustRegister ( watcherUnknownTypeRecordsRead )
prometheus . MustRegister ( watcherRecordDecodeFails )
prometheus . MustRegister ( watcherRecordDecodeFails )
prometheus . MustRegister ( watcherSamplesSentPreTailing )
prometheus . MustRegister ( watcherSamplesSentPreTailing )
prometheus . MustRegister ( watcherCurrentSegment )
prometheus . MustRegister ( watcherCurrentSegment )
@ -141,11 +101,7 @@ type WALWatcher struct {
startTime int64
startTime int64
samplesReadMetric prometheus . Counter
recordsReadMetric * prometheus . CounterVec
seriesReadMetric prometheus . Counter
tombstonesReadMetric prometheus . Counter
invalidReadMetric prometheus . Counter
unknownReadMetric prometheus . Counter
recordDecodeFailsMetric prometheus . Counter
recordDecodeFailsMetric prometheus . Counter
samplesSentPreTailing prometheus . Counter
samplesSentPreTailing prometheus . Counter
currentSegmentMetric prometheus . Gauge
currentSegmentMetric prometheus . Gauge
@ -159,7 +115,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
if logger == nil {
if logger == nil {
logger = log . NewNopLogger ( )
logger = log . NewNopLogger ( )
}
}
w := & WALWatcher {
return & WALWatcher {
logger : logger ,
logger : logger ,
writer : writer ,
writer : writer ,
walDir : path . Join ( walDir , "wal" ) ,
walDir : path . Join ( walDir , "wal" ) ,
@ -167,18 +123,12 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
name : name ,
name : name ,
quit : make ( chan struct { } ) ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
}
w . samplesReadMetric = watcherSamplesRecordsRead . WithLabelValues ( w . name )
recordsReadMetric : watcherRecordsRead . MustCurryWith ( prometheus . Labels { queue : name } ) ,
w . seriesReadMetric = watcherSeriesRecordsRead . WithLabelValues ( w . name )
recordDecodeFailsMetric : watcherRecordDecodeFails . WithLabelValues ( name ) ,
w . tombstonesReadMetric = watcherTombstoneRecordsRead . WithLabelValues ( w . name )
samplesSentPreTailing : watcherSamplesSentPreTailing . WithLabelValues ( name ) ,
w . unknownReadMetric = watcherUnknownTypeRecordsRead . WithLabelValues ( w . name )
currentSegmentMetric : watcherCurrentSegment . WithLabelValues ( name ) ,
w . invalidReadMetric = watcherInvalidRecordsRead . WithLabelValues ( w . name )
}
w . recordDecodeFailsMetric = watcherRecordDecodeFails . WithLabelValues ( w . name )
w . samplesSentPreTailing = watcherSamplesSentPreTailing . WithLabelValues ( w . name )
w . currentSegmentMetric = watcherCurrentSegment . WithLabelValues ( w . name )
return w
}
}
func ( w * WALWatcher ) Start ( ) {
func ( w * WALWatcher ) Start ( ) {
@ -327,36 +277,13 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
return errors . New ( "quit channel" )
return errors . New ( "quit channel" )
case <- checkpointTicker . C :
case <- checkpointTicker . C :
// Periodically check if there is a new checkpoint.
// Periodically check if there is a new checkpoint so we can garbage
// As this is considered an optimisation, we ignore errors during
// collect labels. As this is considered an optimisation, we ignore
// checkpoint processing.
// errors during checkpoint processing.
if err := w . garbageCollectSeries ( segmentNum ) ; err != nil {
dir , _ , err := tsdb . LastCheckpoint ( w . walDir )
level . Warn ( w . logger ) . Log ( "msg" , "error process checkpoint" , "err" , err )
if err != nil && err != tsdb . ErrNotFound {
level . Error ( w . logger ) . Log ( "msg" , "error getting last checkpoint" , "err" , err )
continue
}
index , err := checkpointNum ( dir )
if err != nil {
level . Error ( w . logger ) . Log ( "msg" , "error parsing checkpoint" , "err" , err )
continue
}
}
if index >= segmentNum {
level . Info ( w . logger ) . Log ( "msg" , "current segment is behind the checkpoint, skipping reading of checkpoint" , "current" , fmt . Sprintf ( "%08d" , segmentNum ) , "checkpoint" , dir )
continue
}
level . Info ( w . logger ) . Log ( "msg" , "new checkpoint detected" , "new" , dir , "currentSegment" , segmentNum )
// This potentially takes a long time, should we run it in another go routine?
err = w . readCheckpoint ( dir )
if err != nil {
level . Error ( w . logger ) . Log ( "err" , err )
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w . writer . SeriesReset ( index )
case <- segmentTicker . C :
case <- segmentTicker . C :
_ , last , err := wl . Segments ( )
_ , last , err := wl . Segments ( )
if err != nil {
if err != nil {
@ -390,6 +317,38 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
}
}
}
}
func ( w * WALWatcher ) garbageCollectSeries ( segmentNum int ) error {
dir , _ , err := tsdb . LastCheckpoint ( w . walDir )
if err != nil && err != tsdb . ErrNotFound {
return errors . Wrap ( err , "tsdb.LastCheckpoint" )
}
if dir == "" {
return nil
}
index , err := checkpointNum ( dir )
if err != nil {
return errors . Wrap ( err , "error parsing checkpoint filename" )
}
if index >= segmentNum {
level . Debug ( w . logger ) . Log ( "msg" , "current segment is behind the checkpoint, skipping reading of checkpoint" , "current" , fmt . Sprintf ( "%08d" , segmentNum ) , "checkpoint" , dir )
return nil
}
level . Debug ( w . logger ) . Log ( "msg" , "new checkpoint detected" , "new" , dir , "currentSegment" , segmentNum )
// This potentially takes a long time, should we run it in another go routine?
if err = w . readCheckpoint ( dir ) ; err != nil {
return errors . Wrap ( err , "readCheckpoint" )
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w . writer . SeriesReset ( index )
return nil
}
func ( w * WALWatcher ) readSegment ( r * wal . LiveReader , segmentNum int ) error {
func ( w * WALWatcher ) readSegment ( r * wal . LiveReader , segmentNum int ) error {
for r . Next ( ) && ! isClosed ( w . quit ) {
for r . Next ( ) && ! isClosed ( w . quit ) {
err := w . decodeRecord ( r . Record ( ) , segmentNum )
err := w . decodeRecord ( r . Record ( ) , segmentNum )
@ -402,12 +361,30 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error {
return r . Err ( )
return r . Err ( )
}
}
func recordType ( rt tsdb . RecordType ) string {
switch rt {
case tsdb . RecordInvalid :
return "invalid"
case tsdb . RecordSeries :
return "series"
case tsdb . RecordSamples :
return "samples"
case tsdb . RecordTombstones :
return "tombstones"
default :
return "unkown"
}
}
func ( w * WALWatcher ) decodeRecord ( rec [ ] byte , segmentNum int ) error {
func ( w * WALWatcher ) decodeRecord ( rec [ ] byte , segmentNum int ) error {
var (
var (
dec tsdb . RecordDecoder
dec tsdb . RecordDecoder
series [ ] tsdb . RefSeries
series [ ] tsdb . RefSeries
samples [ ] tsdb . RefSample
samples [ ] tsdb . RefSample
)
)
w . recordsReadMetric . WithLabelValues ( recordType ( dec . Type ( rec ) ) ) . Inc ( )
switch dec . Type ( rec ) {
switch dec . Type ( rec ) {
case tsdb . RecordSeries :
case tsdb . RecordSeries :
series , err := dec . Series ( rec , series [ : 0 ] )
series , err := dec . Series ( rec , series [ : 0 ] )
@ -415,8 +392,8 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error {
w . recordDecodeFailsMetric . Inc ( )
w . recordDecodeFailsMetric . Inc ( )
return err
return err
}
}
w . seriesReadMetric . Add ( float64 ( len ( series ) ) )
w . writer . StoreSeries ( series , segmentNum )
w . writer . StoreSeries ( series , segmentNum )
return nil
case tsdb . RecordSamples :
case tsdb . RecordSamples :
samples , err := dec . Samples ( rec , samples [ : 0 ] )
samples , err := dec . Samples ( rec , samples [ : 0 ] )
@ -431,25 +408,21 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error {
}
}
}
}
if len ( send ) > 0 {
if len ( send ) > 0 {
// We don't want to count samples read prior to the starting timestamp
// so that we can compare samples in vs samples read and succeeded samples.
w . samplesReadMetric . Add ( float64 ( len ( samples ) ) )
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
w . writer . Append ( send )
w . writer . Append ( send )
}
}
return nil
case tsdb . RecordTombstones :
case tsdb . RecordTombstones :
w . tombstonesReadMetric . Add ( float64 ( len ( samples ) ) )
return nil
case tsdb . RecordInvalid :
case tsdb . RecordInvalid :
w . invalidReadMetric . Add ( float64 ( len ( samples ) ) )
return errors . New ( "invalid record" )
return errors . New ( "invalid record" )
default :
default :
w . recordDecodeFailsMetric . Inc ( )
w . recordDecodeFailsMetric . Inc ( )
return errors . New ( "unknown TSDB record type" )
return errors . New ( "unknown TSDB record type" )
}
}
return nil
}
}
// Read all the series records from a Checkpoint directory.
// Read all the series records from a Checkpoint directory.
@ -479,7 +452,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
}
}
if r . TotalRead ( ) != size {
if r . TotalRead ( ) != size {
level . Warn ( w . logger ) . Log ( "msg" , "may not have read all data from checkpoint" )
level . Warn ( w . logger ) . Log ( "msg" , "may not have read all data from checkpoint" , "totalRead" , r . TotalRead ( ) , "size" , size )
}
}
level . Debug ( w . logger ) . Log ( "msg" , "read series references from checkpoint" , "checkpoint" , checkpointDir )
level . Debug ( w . logger ) . Log ( "msg" , "read series references from checkpoint" , "checkpoint" , checkpointDir )