diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 638ee2d32..4cdc37f5e 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -41,50 +41,14 @@ const ( ) var ( - watcherSamplesRecordsRead = prometheus.NewCounterVec( + watcherRecordsRead = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", - Name: "samples_records_read_total", - Help: "Number of samples records read by the WAL watcher from the WAL.", + Name: "records_read_total", + Help: "Number of records read by the WAL watcher from the WAL.", }, - []string{queue}, - ) - watcherSeriesRecordsRead = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "series_records_read_total", - Help: "Number of series records read by the WAL watcher from the WAL.", - }, - []string{queue}, - ) - watcherTombstoneRecordsRead = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "tombstone_records_read_total", - Help: "Number of tombstone records read by the WAL watcher from the WAL.", - }, - []string{queue}, - ) - watcherInvalidRecordsRead = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "invalid_records_read_total", - Help: "Number of invalid records read by the WAL watcher from the WAL.", - }, - []string{queue}, - ) - watcherUnknownTypeRecordsRead = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "wal_watcher", - Name: "unknown_records_read_total", - Help: "Number of records read by the WAL watcher from the WAL of an unknown record type.", - }, - []string{queue}, + []string{queue, "type"}, ) watcherRecordDecodeFails = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -116,11 +80,7 @@ var ( ) func init() { - prometheus.MustRegister(watcherSamplesRecordsRead) - prometheus.MustRegister(watcherSeriesRecordsRead) - prometheus.MustRegister(watcherTombstoneRecordsRead) - prometheus.MustRegister(watcherInvalidRecordsRead) - prometheus.MustRegister(watcherUnknownTypeRecordsRead) + prometheus.MustRegister(watcherRecordsRead) prometheus.MustRegister(watcherRecordDecodeFails) prometheus.MustRegister(watcherSamplesSentPreTailing) prometheus.MustRegister(watcherCurrentSegment) @@ -141,11 +101,7 @@ type WALWatcher struct { startTime int64 - samplesReadMetric prometheus.Counter - seriesReadMetric prometheus.Counter - tombstonesReadMetric prometheus.Counter - invalidReadMetric prometheus.Counter - unknownReadMetric prometheus.Counter + recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter samplesSentPreTailing prometheus.Counter currentSegmentMetric prometheus.Gauge @@ -159,7 +115,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string if logger == nil { logger = log.NewNopLogger() } - w := &WALWatcher{ + return &WALWatcher{ logger: logger, writer: writer, walDir: path.Join(walDir, "wal"), @@ -167,18 +123,12 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string name: name, quit: make(chan struct{}), done: make(chan struct{}), - } - w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.name) - w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.name) - w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.name) - w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.name) - w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.name) - w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name) - w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name) - w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name) - - return w + recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}), + recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name), + samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name), + currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name), + } } func (w *WALWatcher) Start() { @@ -327,36 +277,13 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { return errors.New("quit channel") case <-checkpointTicker.C: - // Periodically check if there is a new checkpoint. - // As this is considered an optimisation, we ignore errors during - // checkpoint processing. - - dir, _, err := tsdb.LastCheckpoint(w.walDir) - if err != nil && err != tsdb.ErrNotFound { - level.Error(w.logger).Log("msg", "error getting last checkpoint", "err", err) - continue - } - - index, err := checkpointNum(dir) - if err != nil { - level.Error(w.logger).Log("msg", "error parsing checkpoint", "err", err) - continue + // Periodically check if there is a new checkpoint so we can garbage + // collect labels. As this is considered an optimisation, we ignore + // errors during checkpoint processing. + if err := w.garbageCollectSeries(segmentNum); err != nil { + level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) } - if index >= segmentNum { - level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) - continue - } - - level.Info(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) - // This potentially takes a long time, should we run it in another go routine? - err = w.readCheckpoint(dir) - if err != nil { - level.Error(w.logger).Log("err", err) - } - // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. - w.writer.SeriesReset(index) - case <-segmentTicker.C: _, last, err := wl.Segments() if err != nil { @@ -390,6 +317,38 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { } } +func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { + dir, _, err := tsdb.LastCheckpoint(w.walDir) + if err != nil && err != tsdb.ErrNotFound { + return errors.Wrap(err, "tsdb.LastCheckpoint") + } + + if dir == "" { + return nil + } + + index, err := checkpointNum(dir) + if err != nil { + return errors.Wrap(err, "error parsing checkpoint filename") + } + + if index >= segmentNum { + level.Debug(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) + return nil + } + + level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) + + // This potentially takes a long time, should we run it in another go routine? + if err = w.readCheckpoint(dir); err != nil { + return errors.Wrap(err, "readCheckpoint") + } + + // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. + w.writer.SeriesReset(index) + return nil +} + func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error { for r.Next() && !isClosed(w.quit) { err := w.decodeRecord(r.Record(), segmentNum) @@ -402,12 +361,30 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error { return r.Err() } +func recordType(rt tsdb.RecordType) string { + switch rt { + case tsdb.RecordInvalid: + return "invalid" + case tsdb.RecordSeries: + return "series" + case tsdb.RecordSamples: + return "samples" + case tsdb.RecordTombstones: + return "tombstones" + default: + return "unkown" + } +} + func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { var ( dec tsdb.RecordDecoder series []tsdb.RefSeries samples []tsdb.RefSample ) + + w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() + switch dec.Type(rec) { case tsdb.RecordSeries: series, err := dec.Series(rec, series[:0]) @@ -415,8 +392,8 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { w.recordDecodeFailsMetric.Inc() return err } - w.seriesReadMetric.Add(float64(len(series))) w.writer.StoreSeries(series, segmentNum) + return nil case tsdb.RecordSamples: samples, err := dec.Samples(rec, samples[:0]) @@ -431,25 +408,21 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { } } if len(send) > 0 { - // We don't want to count samples read prior to the starting timestamp - // so that we can compare samples in vs samples read and succeeded samples. - w.samplesReadMetric.Add(float64(len(samples))) // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). w.writer.Append(send) } + return nil case tsdb.RecordTombstones: - w.tombstonesReadMetric.Add(float64(len(samples))) + return nil case tsdb.RecordInvalid: - w.invalidReadMetric.Add(float64(len(samples))) return errors.New("invalid record") default: w.recordDecodeFailsMetric.Inc() return errors.New("unknown TSDB record type") } - return nil } // Read all the series records from a Checkpoint directory. @@ -479,7 +452,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } if r.TotalRead() != size { - level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint") + level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.TotalRead(), "size", size) } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)