Merge pull request #2527 from prometheus/beorn7/storage

storage: Evict chunks and calculate persistence pressure...
pull/2538/head
Björn Rabenstein 8 years ago committed by GitHub
commit e63d079b59

@ -54,6 +54,10 @@ var cfg = struct {
alertmanagerURLs stringset
prometheusURL string
// Deprecated storage flags, kept for backwards compatibility.
deprecatedMemoryChunks uint64
deprecatedMaxChunksToPersist uint64
}{
alertmanagerURLs: stringset{},
}
@ -145,17 +149,21 @@ func init() {
&cfg.storage.PersistenceStoragePath, "storage.local.path", "data",
"Base path for metrics storage.",
)
cfg.fs.IntVar(
&cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024,
"How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.",
cfg.fs.Uint64Var(
&cfg.storage.TargetHeapSize, "storage.local.target-heap-size", 2*1024*1024*1024,
"The metrics storage attempts to limit its own memory usage such that the total heap size approaches this value. Note that this is not a hard limit. Actual heap size might be temporarily or permanently higher for a variety of reasons. The default value is a relatively safe setting to not use more than 3 GiB physical memory.",
)
cfg.fs.Uint64Var(
&cfg.deprecatedMemoryChunks, "storage.local.memory-chunks", 0,
"Deprecated. If set, -storage.local.target-heap-size will be set to this value times 3072.",
)
cfg.fs.DurationVar(
&cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour,
"How long to retain samples in the local storage.",
)
cfg.fs.IntVar(
&cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024,
"How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.",
cfg.fs.Uint64Var(
&cfg.deprecatedMaxChunksToPersist, "storage.local.max-chunks-to-persist", 0,
"Deprecated. This flag has no effect anymore.",
)
cfg.fs.DurationVar(
&cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute,
@ -276,6 +284,10 @@ func parse(args []string) error {
// don't expose it as a separate flag but set it here.
cfg.storage.HeadChunkTimeout = promql.StalenessDelta
if cfg.storage.TargetHeapSize < 1024*1024 {
return fmt.Errorf("target heap size smaller than %d: %d", 1024*1024, cfg.storage.TargetHeapSize)
}
if err := parsePrometheusURL(); err != nil {
return err
}
@ -292,6 +304,15 @@ func parse(args []string) error {
}
}
// Deal with deprecated storage flags.
if cfg.deprecatedMaxChunksToPersist > 0 {
log.Warn("Flag -storage.local.max-chunks-to-persist is deprecated. It has no effect.")
}
if cfg.deprecatedMemoryChunks > 0 {
cfg.storage.TargetHeapSize = cfg.deprecatedMemoryChunks * 3072
log.Warnf("Flag -storage.local.memory-chunks is deprecated. Its value %d is used to override -storage.local.target-heap-size to %d.", cfg.deprecatedMemoryChunks, cfg.storage.TargetHeapSize)
}
return nil
}

@ -83,18 +83,8 @@ func init() {
prometheus.MustRegister(NumMemDescs)
}
var (
// NumMemChunks is the total number of chunks in memory. This is a
// global counter, also used internally, so not implemented as
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
NumMemChunks int64
// NumMemChunksDesc is the metric descriptor for the above.
NumMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "memory_chunks"),
"The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).",
nil, nil,
)
)
// NumMemChunks is the total number of chunks in memory. This is a global
// counter, also used internally, so not implemented as metrics. Collected in
// MemorySeriesStorage.
// TODO(beorn7): Having this as an exported global variable is really bad.
var NumMemChunks int64

@ -176,7 +176,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
// Try to drop one chunk, which must be prevented by the shrink
// ratio. Since we do not pass in any chunks to persist, the offset
// should be the number of chunks in the file.
for fp, _ := range fpToChunks {
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, nil)
if err != nil {
t.Fatal(err)

@ -18,7 +18,7 @@ import (
"container/list"
"errors"
"fmt"
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -41,8 +41,11 @@ const (
fpMaxSweepTime = 6 * time.Hour
fpMaxWaitDuration = 10 * time.Second
// See waitForNextFP.
maxEvictInterval = time.Minute
// See handleEvictList. This should be clearly shorter than the usual CG
// interval. On the other hand, each evict check calls ReadMemStats,
// which involves stopping the world (at least up to Go1.8). Hence,
// don't just set this to a very short interval.
evictInterval = time.Second
// Constants to control the hysteresis of entering and leaving "rushed
// mode". In rushed mode, the dirty series count is ignored for
@ -76,24 +79,6 @@ const (
fpOtherMatchThreshold = 10000
)
var (
numChunksToPersistDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "chunks_to_persist"),
"The current number of chunks waiting for persistence.",
nil, nil,
)
maxChunksToPersistDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "max_chunks_to_persist"),
"The maximum number of chunks that can be waiting for persistence before sample ingestion will stop.",
nil, nil,
)
maxMemChunksDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "max_memory_chunks"),
"The configured maximum number of chunks that can be held in memory",
nil, nil,
)
)
type quarantineRequest struct {
fp model.Fingerprint
metric model.Metric
@ -147,12 +132,13 @@ type syncStrategy func() bool
// interfacing with a persistence layer to make time series data persistent
// across restarts and evictable from memory.
type MemorySeriesStorage struct {
// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
// archiveHighWatermark, chunksToPersist, persistUrgency have to be aligned for atomic operations.
archiveHighWatermark model.Time // No archived series has samples after this time.
numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
persistUrgency int32 // Persistence urgency score * 1000, int32 allows atomic operations.
rushed bool // Whether the storage is in rushed mode.
rushedMtx sync.Mutex // Protects entering and exiting rushed mode.
rushedMtx sync.Mutex // Protects rushed.
lastNumGC uint32 // To detect if a GC cycle has run.
throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
fpLocker *fingerprintLocker
@ -162,7 +148,7 @@ type MemorySeriesStorage struct {
loopStopping, loopStopped chan struct{}
logThrottlingStopped chan struct{}
maxMemoryChunks int
targetHeapSize uint64
dropAfter time.Duration
headChunkTimeout time.Duration
checkpointInterval time.Duration
@ -180,24 +166,26 @@ type MemorySeriesStorage struct {
persistErrors prometheus.Counter
queuedChunksToPersist prometheus.Counter
numSeries prometheus.Gauge
numHeadChunks prometheus.Gauge
chunksToPersist prometheus.GaugeFunc
memorySeries prometheus.Gauge
headChunks prometheus.Gauge
dirtySeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
discardedSamplesCount *prometheus.CounterVec
nonExistentSeriesMatchesCount prometheus.Counter
ingestedSamples prometheus.Counter
discardedSamples *prometheus.CounterVec
nonExistentSeriesMatches prometheus.Counter
memChunks prometheus.GaugeFunc
maintainSeriesDuration *prometheus.SummaryVec
persistenceUrgencyScore prometheus.Gauge
rushedMode prometheus.Gauge
persistenceUrgencyScore prometheus.GaugeFunc
rushedMode prometheus.GaugeFunc
targetHeapSizeBytes prometheus.GaugeFunc
}
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory.
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
TargetHeapSize uint64 // Desired maximum heap size.
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed.
@ -222,15 +210,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
loopStopped: make(chan struct{}),
logThrottlingStopped: make(chan struct{}),
throttled: make(chan struct{}, 1),
maxMemoryChunks: o.MemoryChunks,
targetHeapSize: o.TargetHeapSize,
dropAfter: o.PersistenceRetentionPeriod,
headChunkTimeout: o.HeadChunkTimeout,
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout),
maxChunksToPersist: o.MaxChunksToPersist,
evictList: list.New(),
evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
evictStopping: make(chan struct{}),
@ -252,13 +238,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
Name: "queued_chunks_to_persist_total",
Help: "The total number of chunks queued for persistence.",
}),
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
}),
numHeadChunks: prometheus.NewGauge(prometheus.GaugeOpts{
headChunks: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "open_head_chunks",
@ -279,13 +265,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
},
[]string{opTypeLabel},
),
ingestedSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{
ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
}),
discardedSamplesCount: prometheus.NewCounterVec(
discardedSamples: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -294,12 +280,21 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
},
[]string{discardReasonLabel},
),
nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{
nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "non_existent_series_matches_total",
Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
}),
memChunks: prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunks",
Help: "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).",
},
func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) },
),
maintainSeriesDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
@ -309,24 +304,63 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
},
[]string{seriesLocationLabel},
),
persistenceUrgencyScore: prometheus.NewGauge(prometheus.GaugeOpts{
}
s.chunksToPersist = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persistence_urgency_score",
Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
}),
rushedMode: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chunks_to_persist",
Help: "The current number of chunks waiting for persistence.",
},
func() float64 {
return float64(s.getNumChunksToPersist())
},
)
s.rushedMode = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rushed_mode",
Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.",
}),
Help: "1 if the storage is in rushed mode, 0 otherwise.",
},
func() float64 {
s.rushedMtx.Lock()
defer s.rushedMtx.Unlock()
if s.rushed {
return 1
}
return 0
},
)
s.persistenceUrgencyScore = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persistence_urgency_score",
Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
},
func() float64 {
score, _ := s.getPersistenceUrgencyScore()
return score
},
)
s.targetHeapSizeBytes = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "target_heap_size_bytes",
Help: "The configured target heap size in bytes.",
},
func() float64 {
return float64(s.targetHeapSize)
},
)
// Initialize metric vectors.
// TODO(beorn7): Rework once we have a utility function for it in client_golang.
s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp)
s.discardedSamplesCount.WithLabelValues(duplicateSample)
s.discardedSamples.WithLabelValues(outOfOrderTimestamp)
s.discardedSamples.WithLabelValues(duplicateSample)
s.maintainSeriesDuration.WithLabelValues(maintainInMemory)
s.maintainSeriesDuration.WithLabelValues(maintainArchived)
s.seriesOps.WithLabelValues(create)
@ -353,7 +387,10 @@ func (s *MemorySeriesStorage) Start() (err error) {
case Always:
syncStrategy = func() bool { return true }
case Adaptive:
syncStrategy = func() bool { return s.calculatePersistenceUrgencyScore() < 1 }
syncStrategy = func() bool {
_, rushed := s.getPersistenceUrgencyScore()
return !rushed
}
default:
panic("unknown sync strategy")
}
@ -385,7 +422,7 @@ func (s *MemorySeriesStorage) Start() (err error) {
for fp := range s.fpToSeries.fpIter() {
if series, ok := s.fpToSeries.get(fp); ok {
if !series.headChunkClosed {
s.numHeadChunks.Inc()
s.headChunks.Inc()
}
}
}
@ -394,7 +431,7 @@ func (s *MemorySeriesStorage) Start() (err error) {
return err
}
log.Infof("%d series loaded.", s.fpToSeries.length())
s.numSeries.Set(float64(s.fpToSeries.length()))
s.memorySeries.Set(float64(s.fpToSeries.length()))
s.mapper, err = newFPMapper(s.fpToSeries, p)
if err != nil {
@ -805,7 +842,7 @@ func (s *MemorySeriesStorage) metricForRange(
// we have a chance the archived metric is not in the range.
has, first, last := s.persistence.hasArchivedMetric(fp)
if !has {
s.nonExistentSeriesMatchesCount.Inc()
s.nonExistentSeriesMatches.Inc()
return nil, nil, false
}
if first.After(through) || last.Before(from) {
@ -882,11 +919,11 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
sample.Value.Equal(series.lastSampleValue) {
return nil
}
s.discardedSamplesCount.WithLabelValues(duplicateSample).Inc()
s.discardedSamples.WithLabelValues(duplicateSample).Inc()
return ErrDuplicateSampleForTimestamp // Caused by the caller.
}
if sample.Timestamp < series.lastTime {
s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc()
s.discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc()
return ErrOutOfOrderSample // Caused by the caller.
}
completedChunksCount, err := series.add(model.SamplePair{
@ -897,7 +934,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
s.quarantineSeries(fp, sample.Metric, err)
return err
}
s.ingestedSamplesCount.Inc()
s.ingestedSamples.Inc()
s.incNumChunksToPersist(completedChunksCount)
return nil
@ -905,8 +942,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
// NeedsThrottling implements Storage.
func (s *MemorySeriesStorage) NeedsThrottling() bool {
if s.getNumChunksToPersist() > s.maxChunksToPersist ||
float64(atomic.LoadInt64(&chunk.NumMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
if score, _ := s.getPersistenceUrgencyScore(); score >= 1 {
select {
case s.throttled <- struct{}{}:
default: // Do nothing, signal already pending.
@ -937,19 +973,19 @@ func (s *MemorySeriesStorage) logThrottling() {
select {
case <-s.throttled:
if !timer.Reset(time.Minute) {
score, _ := s.getPersistenceUrgencyScore()
log.
With("urgencyScore", score).
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.")
}
case <-timer.C:
score, _ := s.getPersistenceUrgencyScore()
log.
With("urgencyScore", score).
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Info("Storage does not need throttling anymore.")
case <-s.loopStopping:
return
@ -994,9 +1030,9 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return nil, err
}
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
s.memorySeries.Inc()
if !series.headChunkClosed {
s.numHeadChunks.Inc()
s.headChunks.Inc()
}
}
return series, nil
@ -1061,23 +1097,16 @@ func (s *MemorySeriesStorage) preloadChunksForInstant(
}
func (s *MemorySeriesStorage) handleEvictList() {
ticker := time.NewTicker(maxEvictInterval)
count := 0
// This ticker is supposed to tick at least once per GC cyle. Ideally,
// we would handle the evict list after each finished GC cycle, but I
// don't know of a way to "subscribe" to that kind of event.
ticker := time.NewTicker(evictInterval)
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
// requests with evict==true that have happened since the last
// evict run is more than maxMemoryChunks/1000.
select {
case req := <-s.evictRequests:
if req.Evict {
req.Desc.EvictListElement = s.evictList.PushBack(req.Desc)
count++
if count > s.maxMemoryChunks/1000 {
s.maybeEvict()
count = 0
}
} else {
if req.Desc.EvictListElement != nil {
s.evictList.Remove(req.Desc.EvictListElement)
@ -1085,9 +1114,7 @@ func (s *MemorySeriesStorage) handleEvictList() {
}
}
case <-ticker.C:
if s.evictList.Len() > 0 {
s.maybeEvict()
}
case <-s.evictStopping:
// Drain evictRequests forever in a goroutine to not let
// requesters hang.
@ -1106,10 +1133,14 @@ func (s *MemorySeriesStorage) handleEvictList() {
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *MemorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&chunk.NumMemChunks)) - s.maxMemoryChunks
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)
numChunksToEvict := s.calculatePersistUrgency(&ms)
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict)
for i := range chunkDescsToEvict {
e := s.evictList.Front()
@ -1143,6 +1174,118 @@ func (s *MemorySeriesStorage) maybeEvict() {
}()
}
// calculatePersistUrgency calculates and sets s.persistUrgency. Based on the
// calculation, it returns the number of chunks to evict. The runtime.MemStats
// are passed in here for testability.
//
// The persist urgency is calculated by the following formula:
//
// n(toPersist) MAX( h(nextGC), h(current) )
// p = MIN( 1, --------------------------- * ---------------------------- )
// n(toPersist) + n(evictable) h(target)
//
// where:
//
// n(toPersist): Number of chunks waiting for persistence.
// n(evictable): Number of evictable chunks.
// h(nextGC): Heap size at which the next GC will kick in (ms.NextGC).
// h(current): Current heap size (ms.HeapAlloc).
// h(target): Configured target heap size.
//
// Note that the actual value stored in s.persistUrgency is 1000 times the value
// calculated as above to allow using an int32, which supports atomic
// operations.
//
// If no GC has run after the last call of this method, it will always return 0
// (no reason to try to evict any more chunks before we have seen the effect of
// the previous eviction). It will also not decrease the persist urgency in this
// case (but it will increase the persist urgency if a higher value was calculated).
//
// If a GC has run after the last call of this method, the following cases apply:
//
// - If MAX( h(nextGC), h(current) ) < h(target), simply return 0. Nothing to
// evict if the heap is still small enough.
//
// - Otherwise, if n(evictable) is 0, also return 0, but set the urgency score
// to 1 to signal that we want to evict chunk but have no evictable chunks
// available.
//
// - Otherwise, calulate the number of chunks to evict and return it:
//
// MAX( h(nextGC), h(current) ) - h(target)
// n(toEvict) = MIN( n(evictable), ---------------------------------------- )
// c
//
// where c is the size of a chunk.
//
// - In the latter case, the persist urgency might be increased. The final value
// is the following:
//
// n(toEvict)
// MAX( p, ------------ )
// n(evictable)
//
// Broadly speaking, the persist urgency is based on the ratio of the number of
// chunks we want to evict and the number of chunks that are actually
// evictable. However, in particular for the case where we don't need to evict
// chunks yet, it also takes into account how close the heap has already grown
// to the configured target size, and how big the pool of chunks to persist is
// compared to the number of chunks already evictable.
//
// This is a helper method only to be called by MemorySeriesStorage.maybeEvict.
func (s *MemorySeriesStorage) calculatePersistUrgency(ms *runtime.MemStats) int {
var (
oldUrgency = atomic.LoadInt32(&s.persistUrgency)
newUrgency int32
numChunksToPersist = s.getNumChunksToPersist()
)
defer func() {
if newUrgency > 1000 {
newUrgency = 1000
}
atomic.StoreInt32(&s.persistUrgency, newUrgency)
}()
// Take the NextGC as the relevant heap size because the heap will grow
// to that size before GC kicks in. However, at times the current heap
// is already larger than NextGC, in which case we take that worse case.
heapSize := ms.NextGC
if ms.HeapAlloc > ms.NextGC {
heapSize = ms.HeapAlloc
}
if numChunksToPersist > 0 {
newUrgency = int32(1000 * uint64(numChunksToPersist) / uint64(numChunksToPersist+s.evictList.Len()) * heapSize / s.targetHeapSize)
}
// Only continue if a GC has happened since we were here last time.
if ms.NumGC == s.lastNumGC {
if oldUrgency > newUrgency {
// Never reduce urgency without a GC run.
newUrgency = oldUrgency
}
return 0
}
s.lastNumGC = ms.NumGC
if heapSize <= s.targetHeapSize {
return 0 // Heap still small enough, don't evict.
}
if s.evictList.Len() == 0 {
// We want to reduce heap size but there is nothing to evict.
newUrgency = 1000
return 0
}
numChunksToEvict := int((heapSize - s.targetHeapSize) / chunk.ChunkLen)
if numChunksToEvict > s.evictList.Len() {
numChunksToEvict = s.evictList.Len()
}
if u := int32(numChunksToEvict * 1000 / s.evictList.Len()); u > newUrgency {
newUrgency = u
}
return numChunksToEvict
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
@ -1213,7 +1356,11 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
return
}
// Reduce the wait time according to the urgency score.
s.waitForNextFP(s.fpToSeries.length(), 1-s.calculatePersistenceUrgencyScore())
score, rushed := s.getPersistenceUrgencyScore()
if rushed {
score = 1
}
s.waitForNextFP(s.fpToSeries.length(), 1-score)
count++
}
if count > 0 {
@ -1329,8 +1476,8 @@ loop:
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the urgency score is < 1.
if dirty >= int64(s.checkpointDirtySeriesLimit) &&
s.calculatePersistenceUrgencyScore() < 1 {
if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
dirty >= int64(s.checkpointDirtySeriesLimit) {
checkpointTimer.Reset(0)
}
}
@ -1404,7 +1551,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
}
if closed {
s.incNumChunksToPersist(1)
s.numHeadChunks.Dec()
s.headChunks.Dec()
}
seriesWasDirty := series.dirty
@ -1426,7 +1573,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
// an age of at least headChunkTimeout (which is very likely anyway).
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.memorySeries.Dec()
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
@ -1523,7 +1670,7 @@ func (s *MemorySeriesStorage) writeMemorySeries(
if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good.
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.memorySeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(fp, series.metric)
return true
@ -1586,12 +1733,12 @@ func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd
return s.persistence.loadChunkDescs(fp, offsetFromEnd)
}
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
// getNumChunksToPersist returns chunksToPersist in a goroutine-safe way.
func (s *MemorySeriesStorage) getNumChunksToPersist() int {
return int(atomic.LoadInt64(&s.numChunksToPersist))
}
// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
// incNumChunksToPersist increments chunksToPersist in a goroutine-safe way. Use a
// negative 'by' to decrement.
func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by))
@ -1600,89 +1747,57 @@ func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
}
}
// calculatePersistenceUrgencyScore calculates and returns an urgency score for
// the speed of persisting chunks. The score is between 0 and 1, where 0 means
// no urgency at all and 1 means highest urgency.
//
// The score is the maximum of the two following sub-scores:
//
// (1) The first sub-score is the number of chunks waiting for persistence
// divided by the maximum number of chunks allowed to be waiting for
// persistence.
// getPersistenceUrgencyScore returns an urgency score for the speed of
// persisting chunks. The score is between 0 and 1, where 0 means no urgency at
// all and 1 means highest urgency. It also returns if the storage is in
// "rushed mode".
//
// (2) If there are more chunks in memory than allowed AND there are more chunks
// waiting for persistence than factorMinChunksToPersist times
// -storage.local.max-chunks-to-persist, then the second sub-score is the
// fraction the number of memory chunks has reached between
// -storage.local.memory-chunks and toleranceFactorForMemChunks times
// -storage.local.memory-chunks.
// The storage enters "rushed mode" if the score exceeds
// persintenceUrgencyScoreForEnteringRushedMode at the time this method is
// called. It will leave "rushed mode" if, at a later time this method is
// called, the score is below persintenceUrgencyScoreForLeavingRushedMode.
// "Rushed mode" plays a role for the adaptive series-sync-strategy. It also
// switches off early checkpointing (due to dirty series), and it makes series
// maintenance happen as quickly as possible.
//
// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the
// storage locks into "rushed mode", in which the returned score is always
// bumped up to 1 until the non-bumped score is below
// persintenceUrgencyScoreForLeavingRushedMode.
// A score of 1 will trigger throttling of sample ingestion.
//
// This method is not goroutine-safe, but it is only ever called by the single
// goroutine that is in charge of series maintenance. According to the returned
// score, series maintenance should be sped up. If a score of 1 is returned,
// checkpointing based on dirty-series count should be disabled, and series
// files should not by synced anymore provided the user has specified the
// adaptive sync strategy.
func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
// It is safe to call this method concurrently.
func (s *MemorySeriesStorage) getPersistenceUrgencyScore() (float64, bool) {
s.rushedMtx.Lock()
defer s.rushedMtx.Unlock()
var (
chunksToPersist = float64(s.getNumChunksToPersist())
maxChunksToPersist = float64(s.maxChunksToPersist)
memChunks = float64(atomic.LoadInt64(&chunk.NumMemChunks))
maxMemChunks = float64(s.maxMemoryChunks)
)
score := chunksToPersist / maxChunksToPersist
if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist {
score = math.Max(
score,
(memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1),
)
}
score := float64(atomic.LoadInt32(&s.persistUrgency)) / 1000
if score > 1 {
score = 1
}
s.persistenceUrgencyScore.Set(score)
if s.rushed {
// We are already in rushed mode. If the score is still above
// persintenceUrgencyScoreForLeavingRushedMode, return 1 and
// leave things as they are.
// persintenceUrgencyScoreForLeavingRushedMode, return the score
// and leave things as they are.
if score > persintenceUrgencyScoreForLeavingRushedMode {
return 1
return score, true
}
// We are out of rushed mode!
s.rushed = false
s.rushedMode.Set(0)
log.
With("urgencyScore", score).
With("chunksToPersist", int(chunksToPersist)).
With("maxChunksToPersist", int(maxChunksToPersist)).
With("memoryChunks", int(memChunks)).
With("maxMemoryChunks", int(maxMemChunks)).
With("chunksToPersist", s.getNumChunksToPersist()).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
Info("Storage has left rushed mode.")
return score
return score, false
}
if score > persintenceUrgencyScoreForEnteringRushedMode {
// Enter rushed mode.
s.rushed = true
s.rushedMode.Set(1)
log.
With("urgencyScore", score).
With("chunksToPersist", int(chunksToPersist)).
With("maxChunksToPersist", int(maxChunksToPersist)).
With("memoryChunks", int(memChunks)).
With("maxMemoryChunks", int(maxMemChunks)).
With("chunksToPersist", s.getNumChunksToPersist()).
With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
Warn("Storage has entered rushed mode.")
return 1
}
return score
return score, s.rushed
}
// quarantineSeries registers the provided fingerprint for quarantining. It
@ -1742,10 +1857,10 @@ func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
if series, ok = s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.memorySeries.Dec()
m = series.metric
// Adjust s.numChunksToPersist and chunk.NumMemChunks down by
// Adjust s.chunksToPersist and chunk.NumMemChunks down by
// the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from
// chunk.NumMemChunks upon eviction.
@ -1802,19 +1917,19 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.persistErrors.Desc()
ch <- s.queuedChunksToPersist.Desc()
ch <- maxChunksToPersistDesc
ch <- numChunksToPersistDesc
ch <- s.numSeries.Desc()
ch <- s.numHeadChunks.Desc()
ch <- s.chunksToPersist.Desc()
ch <- s.memorySeries.Desc()
ch <- s.headChunks.Desc()
ch <- s.dirtySeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch)
ch <- s.nonExistentSeriesMatchesCount.Desc()
ch <- chunk.NumMemChunksDesc
ch <- s.ingestedSamples.Desc()
s.discardedSamples.Describe(ch)
ch <- s.nonExistentSeriesMatches.Desc()
ch <- s.memChunks.Desc()
s.maintainSeriesDuration.Describe(ch)
ch <- s.persistenceUrgencyScore.Desc()
ch <- s.rushedMode.Desc()
ch <- s.targetHeapSizeBytes.Desc()
}
// Collect implements prometheus.Collector.
@ -1824,34 +1939,17 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- s.persistErrors
ch <- s.queuedChunksToPersist
ch <- prometheus.MustNewConstMetric(
maxChunksToPersistDesc,
prometheus.GaugeValue,
float64(s.maxChunksToPersist),
)
ch <- prometheus.MustNewConstMetric(
numChunksToPersistDesc,
prometheus.GaugeValue,
float64(s.getNumChunksToPersist()),
)
ch <- s.numSeries
ch <- s.numHeadChunks
ch <- s.chunksToPersist
ch <- s.memorySeries
ch <- s.headChunks
ch <- s.dirtySeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
s.discardedSamplesCount.Collect(ch)
ch <- s.nonExistentSeriesMatchesCount
ch <- prometheus.MustNewConstMetric(
maxMemChunksDesc,
prometheus.GaugeValue,
float64(s.maxMemoryChunks),
)
ch <- prometheus.MustNewConstMetric(
chunk.NumMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&chunk.NumMemChunks)),
)
ch <- s.ingestedSamples
s.discardedSamples.Collect(ch)
ch <- s.nonExistentSeriesMatches
ch <- s.memChunks
s.maintainSeriesDuration.Collect(ch)
ch <- s.persistenceUrgencyScore
ch <- s.rushedMode
ch <- s.targetHeapSizeBytes
}

@ -19,6 +19,7 @@ import (
"math"
"math/rand"
"os"
"runtime"
"strconv"
"sync/atomic"
"testing"
@ -824,8 +825,7 @@ func TestLoop(t *testing.T) {
directory := testutil.NewTemporaryDirectory("test_storage", t)
defer directory.Close()
o := &MemorySeriesStorageOptions{
MemoryChunks: 50,
MaxChunksToPersist: 1000000,
TargetHeapSize: 100000,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,
@ -877,7 +877,6 @@ func testChunk(t *testing.T, encoding chunk.Encoding) {
for m := range s.fpToSeries.iter() {
s.fpLocker.Lock(m.fp)
defer s.fpLocker.Unlock(m.fp) // TODO remove, see below
var values []model.SamplePair
for _, cd := range m.series.chunkDescs {
if cd.IsEvicted() {
@ -900,7 +899,7 @@ func testChunk(t *testing.T, encoding chunk.Encoding) {
t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value)
}
}
//s.fpLocker.Unlock(m.fp)
s.fpLocker.Unlock(m.fp)
}
log.Info("test done, closing")
}
@ -1459,8 +1458,8 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
s, closer := NewTestStorage(t, encoding)
defer closer.Close()
// Adjust memory chunks to lower value to see evictions.
s.maxMemoryChunks = 1
// Adjust target heap size to lower value to see evictions.
s.targetHeapSize = 1000000
for _, sample := range samples {
s.Append(sample)
@ -1478,7 +1477,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
// Maintain series without any dropped chunks.
s.maintainMemorySeries(fp, 0)
// Give the evict goroutine an opportunity to run.
time.Sleep(250 * time.Millisecond)
time.Sleep(1250 * time.Millisecond)
// Maintain series again to trigger chunk.Desc eviction.
s.maintainMemorySeries(fp, 0)
@ -1605,8 +1604,7 @@ func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) {
directory := testutil.NewTemporaryDirectory("test_storage", b)
defer directory.Close()
o := &MemorySeriesStorageOptions{
MemoryChunks: 100,
MaxChunksToPersist: 1000000,
TargetHeapSize: 200000,
PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,
@ -2010,3 +2008,239 @@ func TestAppendOutOfOrder(t *testing.T) {
}
}
}
func TestCalculatePersistUrgency(t *testing.T) {
tests := map[string]struct {
persistUrgency int32
lenEvictList int
numChunksToPersist int64
targetHeapSize, msNextGC, msHeapAlloc uint64
msNumGC, lastNumGC uint32
wantPersistUrgency int32
wantChunksToEvict int
wantLastNumGC uint32
}{
"all zeros": {
persistUrgency: 0,
lenEvictList: 0,
numChunksToPersist: 0,
targetHeapSize: 0,
msNextGC: 0,
msHeapAlloc: 0,
msNumGC: 0,
lastNumGC: 0,
wantPersistUrgency: 0,
wantChunksToEvict: 0,
wantLastNumGC: 0,
},
"far from target heap size, plenty of chunks to persist, GC has happened": {
persistUrgency: 500,
lenEvictList: 1000,
numChunksToPersist: 100,
targetHeapSize: 1000000,
msNextGC: 500000,
msHeapAlloc: 400000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 45,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"far from target heap size, plenty of chunks to persist, GC hasn't happened, urgency must not decrease": {
persistUrgency: 500,
lenEvictList: 1000,
numChunksToPersist: 100,
targetHeapSize: 1000000,
msNextGC: 500000,
msHeapAlloc: 400000,
msNumGC: 42,
lastNumGC: 42,
wantPersistUrgency: 500,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"far from target heap size but no chunks to persist": {
persistUrgency: 50,
lenEvictList: 0,
numChunksToPersist: 100,
targetHeapSize: 1000000,
msNextGC: 500000,
msHeapAlloc: 400000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 500,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"far from target heap size but no chunks to persist, HeapAlloc > NextGC": {
persistUrgency: 50,
lenEvictList: 0,
numChunksToPersist: 100,
targetHeapSize: 1000000,
msNextGC: 500000,
msHeapAlloc: 600000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 600,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded but GC hasn't happened": {
persistUrgency: 50,
lenEvictList: 3000,
numChunksToPersist: 1000,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 42,
wantPersistUrgency: 275,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded, GC has happened": {
persistUrgency: 50,
lenEvictList: 3000,
numChunksToPersist: 1000,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 275,
wantChunksToEvict: 97,
wantLastNumGC: 42,
},
"target heap size exceeded, GC has happened, urgency bumped due to low number of evictable chunks": {
persistUrgency: 50,
lenEvictList: 300,
numChunksToPersist: 100,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 323,
wantChunksToEvict: 97,
wantLastNumGC: 42,
},
"target heap size exceeded but no evictable chunks and GC hasn't happened": {
persistUrgency: 50,
lenEvictList: 0,
numChunksToPersist: 1000,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 42,
wantPersistUrgency: 1000,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded but no evictable chunks and GC has happened": {
persistUrgency: 50,
lenEvictList: 0,
numChunksToPersist: 1000,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 1000,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded, very few evictable chunks, GC hasn't happened": {
persistUrgency: 50,
lenEvictList: 10,
numChunksToPersist: 1000,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 42,
wantPersistUrgency: 1000,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded, some evictable chunks (but not enough), GC hasn't happened": {
persistUrgency: 50,
lenEvictList: 50,
numChunksToPersist: 250,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 42,
wantPersistUrgency: 916,
wantChunksToEvict: 0,
wantLastNumGC: 42,
},
"target heap size exceeded, some evictable chunks (but not enough), GC has happened": {
persistUrgency: 50,
lenEvictList: 50,
numChunksToPersist: 250,
targetHeapSize: 1000000,
msNextGC: 1100000,
msHeapAlloc: 900000,
msNumGC: 42,
lastNumGC: 41,
wantPersistUrgency: 1000,
wantChunksToEvict: 50,
wantLastNumGC: 42,
},
}
s, closer := NewTestStorage(t, 1)
defer closer.Close()
for scenario, test := range tests {
s.persistUrgency = test.persistUrgency
s.numChunksToPersist = test.numChunksToPersist
s.targetHeapSize = test.targetHeapSize
s.lastNumGC = test.lastNumGC
s.evictList.Init()
for i := 0; i < test.lenEvictList; i++ {
s.evictList.PushBack(&struct{}{})
}
ms := runtime.MemStats{
NextGC: test.msNextGC,
HeapAlloc: test.msHeapAlloc,
NumGC: test.msNumGC,
}
chunksToEvict := s.calculatePersistUrgency(&ms)
if chunksToEvict != test.wantChunksToEvict {
t.Errorf(
"scenario %q: got %d chunks to evict, want %d",
scenario, chunksToEvict, test.wantChunksToEvict,
)
}
if s.persistUrgency != test.wantPersistUrgency {
t.Errorf(
"scenario %q: got persist urgency %d, want %d",
scenario, s.persistUrgency, test.wantPersistUrgency,
)
}
if s.lastNumGC != test.wantLastNumGC {
t.Errorf(
"scenario %q: got lastNumGC %d , want %d",
scenario, s.lastNumGC, test.wantLastNumGC,
)
}
}
}

@ -45,8 +45,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage
chunk.DefaultEncoding = encoding
directory := testutil.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000,
MaxChunksToPersist: 1000000,
TargetHeapSize: 1000000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,

Loading…
Cancel
Save