From b8106dd4593eae6638bc19031d47f94ddd58a124 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 19 Feb 2019 23:51:08 -0800 Subject: [PATCH] Review feedback: - Add a dropped samples EWMA and use it in calculating desired shards. - Update metric names and a log messages. - Limit number of entries in the dedupe logging middleware to prevent potential OOM. Signed-off-by: Callum Styan Signed-off-by: Tom Wilkie --- pkg/logging/dedupe.go | 5 ++++- storage/remote/queue_manager.go | 23 ++++++++++++++--------- storage/remote/queue_manager_test.go | 20 +++++++++++++++++--- storage/remote/storage.go | 2 +- storage/remote/wal_watcher.go | 18 +++++++++++------- 5 files changed, 47 insertions(+), 21 deletions(-) diff --git a/pkg/logging/dedupe.go b/pkg/logging/dedupe.go index 2b0c95c63..f040b2f23 100644 --- a/pkg/logging/dedupe.go +++ b/pkg/logging/dedupe.go @@ -24,6 +24,7 @@ import ( const ( garbageCollectEvery = 10 * time.Second expireEntriesAfter = 1 * time.Minute + maxEntries = 1024 ) type logfmtEncoder struct { @@ -102,7 +103,9 @@ func (d *Deduper) Log(keyvals ...interface{}) error { } d.mtx.Lock() - d.seen[line] = time.Now() + if len(d.seen) < maxEntries { + d.seen[line] = time.Now() + } d.mtx.Unlock() return d.next.Log(keyvals...) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a171e9d0a..489731b44 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -110,7 +110,7 @@ var ( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "queue_last_send_timestamp", + Name: "queue_last_send_timestamp_seconds", Help: "Timestamp of the last successful send by this queue.", }, []string{queue}, @@ -119,7 +119,7 @@ var ( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "queue_highest_sent_timestamp", + Name: "queue_highest_sent_timestamp_seconds", Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.", }, []string{queue}, @@ -211,8 +211,8 @@ type QueueManager struct { quit chan struct{} wg sync.WaitGroup - samplesIn, samplesOut, samplesOutDuration *ewmaRate - integralAccumulator float64 + samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate + integralAccumulator float64 } // NewQueueManager builds a new QueueManager. @@ -242,6 +242,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high quit: make(chan struct{}), samplesIn: samplesIn, + samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } @@ -282,6 +283,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool { // If we have no labels for the series, due to relabelling or otherwise, don't send the sample. if _, ok := t.seriesLabels[sample.Ref]; !ok { droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + t.samplesDropped.incr(1) if _, ok := t.droppedSeries[sample.Ref]; !ok { level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref) } @@ -425,6 +427,7 @@ func (t *QueueManager) updateShardsLoop() { func (t *QueueManager) calculateDesiredShards() { t.samplesIn.tick() t.samplesOut.tick() + t.samplesDropped.tick() t.samplesOutDuration.tick() // We use the number of incoming samples as a prediction of how much work we @@ -434,7 +437,8 @@ func (t *QueueManager) calculateDesiredShards() { var ( samplesIn = t.samplesIn.rate() samplesOut = t.samplesOut.rate() - samplesPending = samplesIn - samplesOut + samplesDropped = t.samplesDropped.rate() + samplesPending = samplesIn - samplesDropped - samplesOut samplesOutDuration = t.samplesOutDuration.rate() ) @@ -447,11 +451,12 @@ func (t *QueueManager) calculateDesiredShards() { var ( timePerSample = samplesOutDuration / samplesOut - desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) + desiredShards = (timePerSample * (samplesIn - samplesDropped + samplesPending + t.integralAccumulator)) / float64(time.Second) ) - level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", - "samplesIn", samplesIn, "samplesOut", samplesOut, - "samplesPending", samplesPending, "desiredShards", desiredShards) + level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards", + "samplesIn", samplesIn, "samplesDropped", samplesDropped, + "samplesOut", samplesOut, "samplesPending", samplesPending, + "desiredShards", desiredShards) // Changes in the number of shards must be greater than shardToleranceFraction. var ( diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 03b71a19e..1811c605c 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -20,6 +20,8 @@ import ( "math" "os" "reflect" + "sort" + "strconv" "sync" "sync/atomic" "testing" @@ -387,7 +389,18 @@ func BenchmarkStartup(b *testing.B) { return } - fmt.Println(dir) + // Find the second largest segment; we will replay up to this. + // (Second largest as WALWatcher will start tailing the largest). + dirents, err := ioutil.ReadDir(dir) + testutil.Ok(b, err) + + var segments []int + for _, dirent := range dirents { + if i, err := strconv.Atoi(dirent.Name()); err != nil { + segments = append(segments, i) + } + } + sort.Ints(segments) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) logger = log.With(logger, "caller", log.DefaultCaller) @@ -399,7 +412,8 @@ func BenchmarkStartup(b *testing.B) { newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) m.watcher.startTime = math.MaxInt64 - m.watcher.maxSegment = 6158 // n-1 - m.watcher.run() + m.watcher.maxSegment = segments[len(segments)-2] + err := m.watcher.run() + testutil.Ok(b, err) } } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 71b5c5465..96163bf29 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -70,7 +70,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal Help: "Samples in to remote storage, compare to samples out for queue managers.", }), highestTimestampMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_remote_storage_highest_timestamp_in", + Name: "prometheus_remote_storage_highest_timestamp_in_seconds", Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", }), } diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index f5cf7fc69..185786c67 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -132,14 +132,18 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name), samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name), currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name), + + maxSegment: -1, } } +// Start the WALWatcher. func (w *WALWatcher) Start() { level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) go w.loop() } +// Stop the WALWatcher. func (w *WALWatcher) Stop() { close(w.quit) <-w.done @@ -170,13 +174,13 @@ func (w *WALWatcher) run() error { return errors.Wrap(err, "wal.New") } - _, last, err := nw.Segments() + _, lastSegment, err := nw.Segments() if err != nil { return errors.Wrap(err, "wal.Segments") } // Backfill from the checkpoint first if it exists. - lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir) + lastCheckpoint, checkpointIndex, err := tsdb.LastCheckpoint(w.walDir) if err != nil && err != tsdb.ErrNotFound { return errors.Wrap(err, "tsdb.LastCheckpoint") } @@ -188,21 +192,22 @@ func (w *WALWatcher) run() error { } w.lastCheckpoint = lastCheckpoint - currentSegment, err := w.findSegmentForIndex(nextIndex) + currentSegment, err := w.findSegmentForIndex(checkpointIndex) if err != nil { return err } - level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last) + level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil { + if err := w.watch(nw, currentSegment, currentSegment >= lastSegment); err != nil { return err } + // For testing: stop when you hit a specific segment. if currentSegment == w.maxSegment { return nil } @@ -244,7 +249,7 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -// Use tail true to indicate thatreader is currently on a segment that is +// Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { @@ -365,7 +370,6 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error { level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) - // This potentially takes a long time, should we run it in another go routine? if err = w.readCheckpoint(dir); err != nil { return errors.Wrap(err, "readCheckpoint") }