@ -41,26 +41,21 @@ const (
// Constants to control the hysteresis of entering and leaving "rushed
// mode". In rushed mode, the dirty series count is ignored for
// checkpointing, and series files are not synced if the adaptive sync
// strategy is used.
//
// If we reach 80% of -storage.local.max-chunks-to-persist, we enter
// "rushed mode".
factorChunksToPersistForEnteringRushedMode = 0.8
// To leave "rushed mode", we must be below 70% of
// -storage.local.max-chunks-to-persist.
factorChunksToPersistForLeavingRushedMode = 0.7
// To enter "rushed mode" for other reasons (see below), we must have at
// least 30% of -storage.local.max-chunks-to-persist.
factorMinChunksToPersistToAllowRushedMode = 0.3
// If the number of chunks in memory reaches 110% of
// -storage.local.memory-chunks, we will enter "rushed mode" (provided
// we have enough chunks to persist at all, see
// factorMinChunksToPersistToAllowRushedMode.)
factorMemChunksForEnteringRushedMode = 1.1
// To leave "rushed mode", we must be below 105% of
// -storage.local.memory-chunks.
factorMemChunksForLeavingRushedMode = 1.05
// checkpointing, series are maintained as frequently as possible, and
// series files are not synced if the adaptive sync strategy is used.
persintenceUrgencyScoreForEnteringRushedMode = 0.8
persintenceUrgencyScoreForLeavingRushedMode = 0.7
// This factor times -storage.local.memory-chunks is the number of
// memory chunks we tolerate before suspending ingestion (TODO!). It is
// also a basis for calculating the persistenceUrgencyScore.
toleranceFactorForMemChunks = 1.1
// This factor times -storage.local.max-chunks-to-persist is the minimum
// required number of chunks waiting for persistence before the number
// of chunks in memory may influence the persistenceUrgencyScore. (In
// other words: if there are no chunks to persist, it doesn't help chunk
// eviction if we speed up persistence.)
factorMinChunksToPersist = 0.2
)
var (
@ -155,6 +150,8 @@ type memorySeriesStorage struct {
outOfOrderSamplesCount prometheus . Counter
invalidPreloadRequestsCount prometheus . Counter
maintainSeriesDuration * prometheus . SummaryVec
persistenceUrgencyScore prometheus . Gauge
rushedMode prometheus . Gauge
}
// MemorySeriesStorageOptions contains options needed by
@ -243,6 +240,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
} ,
[ ] string { seriesLocationLabel } ,
) ,
persistenceUrgencyScore : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "persistence_urgency_score" ,
Help : "A score of urgency to persist chunks, 0 is least urgent, 1 most." ,
} ) ,
rushedMode : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "rushed_mode" ,
Help : "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1." ,
} ) ,
}
return s
}
@ -256,7 +265,7 @@ func (s *memorySeriesStorage) Start() (err error) {
case Always :
syncStrategy = func ( ) bool { return true }
case Adaptive :
syncStrategy = func ( ) bool { return ! s . inRushedMode ( ) }
syncStrategy = func ( ) bool { return s . calculatePersistenceUrgencyScore ( ) < 1 }
default :
panic ( "unknown sync strategy" )
}
@ -823,8 +832,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
case <- s . loopStopping :
return
}
// Reduce the wait time by the backlog score.
s . waitForNextFP ( s . fpToSeries . length ( ) , s . persistenceBacklog Score( ) )
// Reduce the wait time according to the urgency score.
s . waitForNextFP ( s . fpToSeries . length ( ) , 1 - s . calculatePersistenceUrgency Score( ) )
count ++
}
if count > 0 {
@ -916,8 +925,9 @@ loop:
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the storage is not in "rushed mode".
if dirtySeriesCount >= s . checkpointDirtySeriesLimit && ! s . inRushedMode ( ) {
// quickly as possible. So only checkpoint if the urgency score is < 1.
if dirtySeriesCount >= s . checkpointDirtySeriesLimit &&
s . calculatePersistenceUrgencyScore ( ) < 1 {
checkpointTimer . Reset ( 0 )
}
}
@ -1161,78 +1171,83 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
atomic . AddInt64 ( & s . numChunksToPersist , int64 ( by ) )
}
// inRushedMode returns whether the storage is in "rushed mode", which is the
// case if there are too many chunks waiting for persistence or there are too
// many chunks in memory. The method is not goroutine safe (but only ever called
// from the goroutine dealing with series maintenance). Changes of degradation
// mode are logged.
func ( s * memorySeriesStorage ) inRushedMode ( ) bool {
chunksToPersist := float64 ( s . getNumChunksToPersist ( ) )
memChunks := float64 ( atomic . LoadInt64 ( & numMemChunks ) )
if s . rushed {
// We are already in rushed mode, so check if we can get out of
// it, using the lower hysteresis thresholds.
s . rushed = chunksToPersist > float64 ( s . maxChunksToPersist ) * factorChunksToPersistForLeavingRushedMode ||
memChunks > float64 ( s . maxMemoryChunks ) * factorMemChunksForLeavingRushedMode
if ! s . rushed {
log . Warn ( "Storage has left rushed mode. Things are back to normal." )
}
return s . rushed
}
// We are not rushed yet, so check the higher hysteresis threshold if we enter it now.
// First WRT chunksToPersist...
s . rushed = chunksToPersist > float64 ( s . maxChunksToPersist ) * factorChunksToPersistForEnteringRushedMode
if s . rushed {
log . Warnf (
"%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible." ,
chunksToPersist ,
chunksToPersist * 100 / float64 ( s . maxChunksToPersist ) ,
s . maxChunksToPersist ,
s . checkpointInterval ,
)
return true
}
// ...then WRT memChunks.
s . rushed = memChunks > float64 ( s . maxMemoryChunks ) * factorMemChunksForEnteringRushedMode &&
chunksToPersist > float64 ( s . maxChunksToPersist ) * factorMinChunksToPersistToAllowRushedMode
if s . rushed {
log . Warnf (
"%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible." ,
memChunks ,
memChunks * 100 / float64 ( s . maxMemoryChunks ) ,
s . maxMemoryChunks ,
s . checkpointInterval ,
)
}
return s . rushed
}
// persistenceBacklogScore works similar to inRushedMode, but returns a score
// about how close we are to degradation. This score is 1.0 if no chunks are
// waiting for persistence or we are not over the threshold for memory chunks,
// and 0.0 if we are at or above the thresholds. However, the score is always 0
// if the storage is currently in rushed mode. (Getting out of it has a
// hysteresis, so we might be below thresholds again but still in rushed mode.)
func ( s * memorySeriesStorage ) persistenceBacklogScore ( ) float64 {
if s . inRushedMode ( ) {
return 0
}
chunksToPersist := float64 ( s . getNumChunksToPersist ( ) )
score := 1 - chunksToPersist / ( float64 ( s . maxChunksToPersist ) * factorChunksToPersistForEnteringRushedMode )
if chunksToPersist > float64 ( s . maxChunksToPersist ) * factorMinChunksToPersistToAllowRushedMode {
memChunks := float64 ( atomic . LoadInt64 ( & numMemChunks ) )
score = math . Min (
// calculatePersistenceUrgencyScore calculates and returns an urgency score for
// the speed of persisting chunks. The score is between 0 and 1, where 0 means
// no urgency at all and 1 means highest urgency.
//
// The score is the maximum of the two following sub-scores:
//
// (1) The first sub-score is the number of chunks waiting for persistence
// divided by the maximum number of chunks allowed to be waiting for
// persistence.
//
// (2) If there are more chunks in memory than allowed AND there are more chunks
// waiting for persistence than factorMinChunksToPersist times
// -storage.local.max-chunks-to-persist, then the second sub-score is the
// fraction the number of memory chunks has reached between
// -storage.local.memory-chunks and toleranceFactorForMemChunks times
// -storage.local.memory-chunks.
//
// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the
// storage locks into "rushed mode", in which the returned score is always
// bumped up to 1 until the non-bumped score is below
// persintenceUrgencyScoreForLeavingRushedMode.
//
// This method is not goroutine-safe, but it is only ever called by the single
// goroutine that is in charge of series maintenance. According to the returned
// score, series maintenence should be sped up. If a score of 1 is returned,
// checkpointing based on dirty-series count should be disabled, and series
// files should not by synced anymore provided the user has specified the
// adaptive sync strategy.
func ( s * memorySeriesStorage ) calculatePersistenceUrgencyScore ( ) float64 {
var (
chunksToPersist = float64 ( s . getNumChunksToPersist ( ) )
maxChunksToPersist = float64 ( s . maxChunksToPersist )
memChunks = float64 ( atomic . LoadInt64 ( & numMemChunks ) )
maxMemChunks = float64 ( s . maxMemoryChunks )
)
score := chunksToPersist / maxChunksToPersist
if chunksToPersist > maxChunksToPersist * factorMinChunksToPersist {
score = math . Max (
score ,
1 - ( memChunks / float64 ( s . maxMemory Chunks ) - 1 ) / ( factorMemChunksForEnteringRushedMode - 1 ) ,
( memChunks / maxMemChunks - 1 ) / ( toleranceFactorForMemChunks - 1 ) ,
)
}
if score < 0 {
return 0
}
if score > 1 {
score = 1
}
s . persistenceUrgencyScore . Set ( score )
if s . rushed {
// We are already in rushed mode. If the score is still above
// persintenceUrgencyScoreForLeavingRushedMode, return 1 and
// leave things as they are.
if score > persintenceUrgencyScoreForLeavingRushedMode {
return 1
}
// We are out of rushed mode!
s . rushed = false
s . rushedMode . Set ( 0 )
log .
With ( "urgencyScore" , score ) .
With ( "chunksToPersist" , chunksToPersist ) .
With ( "maxChunksToPersist" , maxChunksToPersist ) .
With ( "memoryChunks" , memChunks ) .
With ( "maxMemoryChunks" , maxMemChunks ) .
Warn ( "Storage has left rushed mode." )
return score
}
if score > persintenceUrgencyScoreForEnteringRushedMode {
// Enter rushed mode.
s . rushed = true
s . rushedMode . Set ( 1 )
log .
With ( "urgencyScore" , score ) .
With ( "chunksToPersist" , chunksToPersist ) .
With ( "maxChunksToPersist" , maxChunksToPersist ) .
With ( "memoryChunks" , memChunks ) .
With ( "maxMemoryChunks" , maxMemChunks ) .
Warn ( "Storage has entered rushed mode." )
return 1
}
return score
@ -1253,6 +1268,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s . invalidPreloadRequestsCount . Desc ( )
ch <- numMemChunksDesc
s . maintainSeriesDuration . Describe ( ch )
ch <- s . persistenceUrgencyScore . Desc ( )
ch <- s . rushedMode . Desc ( )
}
// Collect implements prometheus.Collector.
@ -1282,4 +1299,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
float64 ( atomic . LoadInt64 ( & numMemChunks ) ) ,
)
s . maintainSeriesDuration . Collect ( ch )
ch <- s . persistenceUrgencyScore
ch <- s . rushedMode
}