Review feedback:

- Add a dropped samples EWMA and use it in calculating desired shards.
- Update metric names and a log messages.
- Limit number of entries in the dedupe logging middleware to prevent potential OOM.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/5289/head
Callum Styan 2019-02-19 23:51:08 -08:00 committed by Tom Wilkie
parent 512f549064
commit b8106dd459
5 changed files with 47 additions and 21 deletions

View File

@ -24,6 +24,7 @@ import (
const (
garbageCollectEvery = 10 * time.Second
expireEntriesAfter = 1 * time.Minute
maxEntries = 1024
)
type logfmtEncoder struct {
@ -102,7 +103,9 @@ func (d *Deduper) Log(keyvals ...interface{}) error {
}
d.mtx.Lock()
d.seen[line] = time.Now()
if len(d.seen) < maxEntries {
d.seen[line] = time.Now()
}
d.mtx.Unlock()
return d.next.Log(keyvals...)

View File

@ -110,7 +110,7 @@ var (
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_last_send_timestamp",
Name: "queue_last_send_timestamp_seconds",
Help: "Timestamp of the last successful send by this queue.",
},
[]string{queue},
@ -119,7 +119,7 @@ var (
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_highest_sent_timestamp",
Name: "queue_highest_sent_timestamp_seconds",
Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.",
},
[]string{queue},
@ -211,8 +211,8 @@ type QueueManager struct {
quit chan struct{}
wg sync.WaitGroup
samplesIn, samplesOut, samplesOutDuration *ewmaRate
integralAccumulator float64
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
integralAccumulator float64
}
// NewQueueManager builds a new QueueManager.
@ -242,6 +242,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
quit: make(chan struct{}),
samplesIn: samplesIn,
samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
}
@ -282,6 +283,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool {
// If we have no labels for the series, due to relabelling or otherwise, don't send the sample.
if _, ok := t.seriesLabels[sample.Ref]; !ok {
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
t.samplesDropped.incr(1)
if _, ok := t.droppedSeries[sample.Ref]; !ok {
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref)
}
@ -425,6 +427,7 @@ func (t *QueueManager) updateShardsLoop() {
func (t *QueueManager) calculateDesiredShards() {
t.samplesIn.tick()
t.samplesOut.tick()
t.samplesDropped.tick()
t.samplesOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we
@ -434,7 +437,8 @@ func (t *QueueManager) calculateDesiredShards() {
var (
samplesIn = t.samplesIn.rate()
samplesOut = t.samplesOut.rate()
samplesPending = samplesIn - samplesOut
samplesDropped = t.samplesDropped.rate()
samplesPending = samplesIn - samplesDropped - samplesOut
samplesOutDuration = t.samplesOutDuration.rate()
)
@ -447,11 +451,12 @@ func (t *QueueManager) calculateDesiredShards() {
var (
timePerSample = samplesOutDuration / samplesOut
desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second)
desiredShards = (timePerSample * (samplesIn - samplesDropped + samplesPending + t.integralAccumulator)) / float64(time.Second)
)
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
"samplesIn", samplesIn, "samplesOut", samplesOut,
"samplesPending", samplesPending, "desiredShards", desiredShards)
level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards",
"samplesIn", samplesIn, "samplesDropped", samplesDropped,
"samplesOut", samplesOut, "samplesPending", samplesPending,
"desiredShards", desiredShards)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (

View File

@ -20,6 +20,8 @@ import (
"math"
"os"
"reflect"
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
@ -387,7 +389,18 @@ func BenchmarkStartup(b *testing.B) {
return
}
fmt.Println(dir)
// Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest).
dirents, err := ioutil.ReadDir(dir)
testutil.Ok(b, err)
var segments []int
for _, dirent := range dirents {
if i, err := strconv.Atoi(dirent.Name()); err != nil {
segments = append(segments, i)
}
}
sort.Ints(segments)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
logger = log.With(logger, "caller", log.DefaultCaller)
@ -399,7 +412,8 @@ func BenchmarkStartup(b *testing.B) {
newEWMARate(ewmaWeight, shardUpdateDuration),
&temp, config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
m.watcher.startTime = math.MaxInt64
m.watcher.maxSegment = 6158 // n-1
m.watcher.run()
m.watcher.maxSegment = segments[len(segments)-2]
err := m.watcher.run()
testutil.Ok(b, err)
}
}

View File

@ -70,7 +70,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
Help: "Samples in to remote storage, compare to samples out for queue managers.",
}),
highestTimestampMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_remote_storage_highest_timestamp_in",
Name: "prometheus_remote_storage_highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}

View File

@ -132,14 +132,18 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name),
samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name),
currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name),
maxSegment: -1,
}
}
// Start the WALWatcher.
func (w *WALWatcher) Start() {
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
go w.loop()
}
// Stop the WALWatcher.
func (w *WALWatcher) Stop() {
close(w.quit)
<-w.done
@ -170,13 +174,13 @@ func (w *WALWatcher) run() error {
return errors.Wrap(err, "wal.New")
}
_, last, err := nw.Segments()
_, lastSegment, err := nw.Segments()
if err != nil {
return errors.Wrap(err, "wal.Segments")
}
// Backfill from the checkpoint first if it exists.
lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir)
lastCheckpoint, checkpointIndex, err := tsdb.LastCheckpoint(w.walDir)
if err != nil && err != tsdb.ErrNotFound {
return errors.Wrap(err, "tsdb.LastCheckpoint")
}
@ -188,21 +192,22 @@ func (w *WALWatcher) run() error {
}
w.lastCheckpoint = lastCheckpoint
currentSegment, err := w.findSegmentForIndex(nextIndex)
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
if err != nil {
return err
}
level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last)
level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil {
if err := w.watch(nw, currentSegment, currentSegment >= lastSegment); err != nil {
return err
}
// For testing: stop when you hit a specific segment.
if currentSegment == w.maxSegment {
return nil
}
@ -244,7 +249,7 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
return -1, errors.New("failed to find segment for index")
}
// Use tail true to indicate thatreader is currently on a segment that is
// Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
@ -365,7 +370,6 @@ func (w *WALWatcher) garbageCollectSeries(segmentNum int) error {
level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum)
// This potentially takes a long time, should we run it in another go routine?
if err = w.readCheckpoint(dir); err != nil {
return errors.Wrap(err, "readCheckpoint")
}