storage: Replace fpIter by sortedFPs

The fpIter was kind of cumbersome to use and required a lock for each
iteration (which wasn't even needed for the iteration at startup after
loading the checkpoint).

The new implementation here has an obvious penalty in memory, but it's
only 8 byte per series, so 80MiB for a beefy server with 10M memory
time series (which would probably need ~100GiB RAM, so the memory
penalty is only 0.1% of the total memory need).

The big advantage is that now series maintenance happens in order,
which leads to the time between two maintenances of the same series
being less random. Ideally, after each maintenance, the next
maintenance would tackle the series with the largest number of
non-persisted chunks. That would be quite an effort to find out or
track, but with the approach here, the next maintenance will tackle
the series whose previous maintenance is longest ago, which is a good
approximation.

While this commit won't change the _average_ number of chunks
persisted per maintenance, it will reduce the mean time a given chunk
has to wait for its persistence and thus reduce the steady-state
number of chunks waiting for persistence.

Also, the map iteration in Go is non-deterministic but not truly
random. In practice, the iteration appears to be somewhat "bucketed".
You can often observe a bunch of series with similar duration since
their last maintenance, i.e. you see batches of series with similar
number of chunks persisted per maintenance. If that batch is
relatively young, a whole lot of series are maintained with very few
chunks to persist. (See screenshot in PR for a better explanation.)
pull/2559/head
beorn7 2017-03-27 19:52:53 +02:00
parent 29f05680a2
commit d284ffab03
2 changed files with 33 additions and 39 deletions

View File

@ -106,26 +106,18 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
return ch
}
// fpIter returns a channel that produces all fingerprints in the seriesMap. The
// channel will be closed once all fingerprints have been received. Not
// consuming all fingerprints from the channel will leak a goroutine. The
// semantics of concurrent modification of seriesMap is the similar as the one
// for iterating over a map with a 'range' clause. However, if the next element
// in iteration order is removed after the current element has been received
// from the channel, it will still be produced by the channel.
func (sm *seriesMap) fpIter() <-chan model.Fingerprint {
ch := make(chan model.Fingerprint)
go func() {
sm.mtx.RLock()
for fp := range sm.m {
sm.mtx.RUnlock()
ch <- fp
sm.mtx.RLock()
}
sm.mtx.RUnlock()
close(ch)
}()
return ch
// sortedFPs returns a sorted slice of all the fingerprints in the seriesMap.
func (sm *seriesMap) sortedFPs() model.Fingerprints {
sm.mtx.RLock()
fps := make(model.Fingerprints, 0, len(sm.m))
for fp := range sm.m {
fps = append(fps, fp)
}
sm.mtx.RUnlock()
// Sorting could take some time, so do it outside of the lock.
sort.Sort(fps)
return fps
}
type memorySeries struct {

View File

@ -18,6 +18,7 @@ import (
"container/list"
"errors"
"fmt"
"math/rand"
"runtime"
"sort"
"sync"
@ -419,11 +420,9 @@ func (s *MemorySeriesStorage) Start() (err error) {
log.Info("Loading series map and head chunks...")
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
for fp := range s.fpToSeries.fpIter() {
if series, ok := s.fpToSeries.get(fp); ok {
if !series.headChunkClosed {
s.headChunks.Inc()
}
for _, series := range s.fpToSeries.m {
if !series.headChunkClosed {
s.headChunks.Inc()
}
}
@ -1330,16 +1329,8 @@ func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFact
func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
memoryFingerprints := make(chan model.Fingerprint)
go func() {
var fpIter <-chan model.Fingerprint
defer func() {
if fpIter != nil {
for range fpIter {
// Consume the iterator.
}
}
close(memoryFingerprints)
}()
defer close(memoryFingerprints)
firstPass := true
for {
// Initial wait, also important if there are no FPs yet.
@ -1347,9 +1338,15 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
return
}
begin := time.Now()
fpIter = s.fpToSeries.fpIter()
fps := s.fpToSeries.sortedFPs()
if firstPass {
// Start first pass at a random location in the
// key space to cover the whole key space even
// in the case of frequent restarts.
fps = fps[rand.Intn(len(fps)):]
}
count := 0
for fp := range fpIter {
for _, fp := range fps {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
@ -1364,11 +1361,16 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
count++
}
if count > 0 {
msg := "full"
if firstPass {
msg = "initial partial"
}
log.Infof(
"Completed maintenance sweep through %d in-memory fingerprints in %v.",
count, time.Since(begin),
"Completed %s maintenance sweep through %d in-memory fingerprints in %v.",
msg, count, time.Since(begin),
)
}
firstPass = false
}
}()