@ -17,6 +17,7 @@ package local
import (
import (
"container/list"
"container/list"
"fmt"
"fmt"
"math"
"sync/atomic"
"sync/atomic"
"time"
"time"
@ -38,11 +39,28 @@ const (
// See waitForNextFP.
// See waitForNextFP.
maxEvictInterval = time . Minute
maxEvictInterval = time . Minute
// If numChunskToPersist is this percentage of maxChunksToPersist, we
// Constants to control the hysteresis of entering and leaving "rushed
// consider the storage in "graceful degradation mode", i.e. we do not
// mode". In rushed mode, the dirty series count is ignored for
// checkpoint anymore based on the dirty series count, and we do not
// checkpointing, and series files are not synced if the adaptive sync
// sync series files anymore if using the adaptive sync strategy.
// strategy is used.
percentChunksToPersistForDegradation = 80
//
// If we reach 80% of -storage.local.max-chunks-to-persist, we enter
// "rushed mode".
factorChunksToPersistForEnteringRushedMode = 0.8
// To leave "rushed mode", we must be below 70% of
// -storage.local.max-chunks-to-persist.
factorChunksToPersistForLeavingRushedMode = 0.7
// To enter "rushed mode" for other reasons (see below), we must have at
// least 30% of -storage.local.max-chunks-to-persist.
factorMinChunksToPersistToAllowRushedMode = 0.3
// If the number of chunks in memory reaches 110% of
// -storage.local.memory-chunks, we will enter "rushed mode" (provided
// we have enough chunks to persist at all, see
// factorMinChunksToPersistToAllowRushedMode.)
factorMemChunksForEnteringRushedMode = 1.1
// To leave "rushed mode", we must be below 105% of
// -storage.local.memory-chunks.
factorMemChunksForLeavingRushedMode = 1.05
)
)
var (
var (
@ -110,7 +128,7 @@ type memorySeriesStorage struct {
// numChunksToPersist has to be aligned for atomic operations.
// numChunksToPersist has to be aligned for atomic operations.
numChunksToPersist int64 // The number of chunks waiting for persistence.
numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
degraded bool
rushed bool // Whether the storage is in rushed mode.
fpLocker * fingerprintLocker
fpLocker * fingerprintLocker
fpToSeries * seriesMap
fpToSeries * seriesMap
@ -238,7 +256,7 @@ func (s *memorySeriesStorage) Start() (err error) {
case Always :
case Always :
syncStrategy = func ( ) bool { return true }
syncStrategy = func ( ) bool { return true }
case Adaptive :
case Adaptive :
syncStrategy = func ( ) bool { return ! s . i sDegraded ( ) }
syncStrategy = func ( ) bool { return ! s . i nRushedMode ( ) }
default :
default :
panic ( "unknown sync strategy" )
panic ( "unknown sync strategy" )
}
}
@ -898,9 +916,8 @@ loop:
// would be counterproductive, as it would slow down chunk persisting even more,
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the storage is not in "graceful
// quickly as possible. So only checkpoint if the storage is not in "rushed mode".
// degradation mode".
if dirtySeriesCount >= s . checkpointDirtySeriesLimit && ! s . inRushedMode ( ) {
if dirtySeriesCount >= s . checkpointDirtySeriesLimit && ! s . isDegraded ( ) {
checkpointTimer . Reset ( 0 )
checkpointTimer . Reset ( 0 )
}
}
}
}
@ -1144,37 +1161,80 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
atomic . AddInt64 ( & s . numChunksToPersist , int64 ( by ) )
atomic . AddInt64 ( & s . numChunksToPersist , int64 ( by ) )
}
}
// isDegraded returns whether the storage is in "graceful degradation mode",
// inRushedMode returns whether the storage is in "rushed mode", which is the
// which is the case if the number of chunks waiting for persistence has reached
// case if there are too many chunks waiting for persistence or there are too
// a percentage of maxChunksToPersist that exceeds
// many chunks in memory. The method is not goroutine safe (but only ever called
// percentChunksToPersistForDegradation. The method is not goroutine safe (but
// from the goroutine dealing with series maintenance). Changes of degradation
// only ever called from the goroutine dealing with series maintenance).
// mode are logged.
// Changes of degradation mode are logged.
func ( s * memorySeriesStorage ) inRushedMode ( ) bool {
func ( s * memorySeriesStorage ) isDegraded ( ) bool {
chunksToPersist := float64 ( s . getNumChunksToPersist ( ) )
nowDegraded := s . getNumChunksToPersist ( ) > s . maxChunksToPersist * percentChunksToPersistForDegradation / 100
memChunks := float64 ( atomic . LoadInt64 ( & numMemChunks ) )
if s . degraded && ! nowDegraded {
log . Warn ( "Storage has left graceful degradation mode. Things are back to normal." )
if s . rushed {
} else if ! s . degraded && nowDegraded {
// We are already in rushed mode, so check if we can get out of
// it, using the lower hysteresis thresholds.
s . rushed = chunksToPersist > float64 ( s . maxChunksToPersist ) * factorChunksToPersistForLeavingRushedMode ||
memChunks > float64 ( s . maxMemoryChunks ) * factorMemChunksForLeavingRushedMode
if ! s . rushed {
log . Warn ( "Storage has left rushed mode. Things are back to normal." )
}
return s . rushed
}
// We are not rushed yet, so check the higher hysteresis threshold if we enter it now.
// First WRT chunksToPersist...
s . rushed = chunksToPersist > float64 ( s . maxChunksToPersist ) * factorChunksToPersistForEnteringRushedMode
if s . rushed {
log . Warnf (
log . Warnf (
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible." ,
"% .0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.",
s . getNumChunksToPersist ( ) ,
chunksToPersist ,
s . getNumChunksToPersist ( ) * 100 / s . maxChunksToPersist ,
chunksToPersist* 100 / float64 ( s . maxChunksToPersist ) ,
s . maxChunksToPersist ,
s . maxChunksToPersist ,
s . checkpointInterval )
s . checkpointInterval ,
)
return true
}
// ...then WRT memChunks.
s . rushed = memChunks > float64 ( s . maxMemoryChunks ) * factorMemChunksForEnteringRushedMode &&
chunksToPersist > float64 ( s . maxChunksToPersist ) * factorMinChunksToPersistToAllowRushedMode
if s . rushed {
log . Warnf (
"%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible." ,
memChunks ,
memChunks * 100 / float64 ( s . maxMemoryChunks ) ,
s . maxMemoryChunks ,
s . checkpointInterval ,
)
}
}
s . degraded = nowDegraded
return s . rushed
return s . degraded
}
}
// persistenceBacklogScore works similar to isDegraded, but returns a score
// persistenceBacklogScore works similar to i nRushedMode , but returns a score
// about how close we are to degradation. This score is 1.0 if no chunks are
// about how close we are to degradation. This score is 1.0 if no chunks are
// waiting for persistence and 0.0 if we are at or above the degradation
// waiting for persistence or we are not over the threshold for memory chunks,
// threshold.
// and 0.0 if we are at or above the thresholds. However, the score is always 0
// if the storage is currently in rushed mode. (Getting out of it has a
// hysteresis, so we might be below thresholds again but still in rushed mode.)
func ( s * memorySeriesStorage ) persistenceBacklogScore ( ) float64 {
func ( s * memorySeriesStorage ) persistenceBacklogScore ( ) float64 {
score := 1 - float64 ( s . getNumChunksToPersist ( ) ) / float64 ( s . maxChunksToPersist * percentChunksToPersistForDegradation / 100 )
if s . inRushedMode ( ) {
return 0
}
chunksToPersist := float64 ( s . getNumChunksToPersist ( ) )
score := 1 - chunksToPersist / ( float64 ( s . maxChunksToPersist ) * factorChunksToPersistForEnteringRushedMode )
if chunksToPersist > float64 ( s . maxChunksToPersist ) * factorMinChunksToPersistToAllowRushedMode {
memChunks := float64 ( atomic . LoadInt64 ( & numMemChunks ) )
score = math . Min (
score ,
1 - ( memChunks / float64 ( s . maxMemoryChunks ) - 1 ) / ( factorMemChunksForEnteringRushedMode - 1 ) ,
)
}
if score < 0 {
if score < 0 {
return 0
return 0
}
}
if score > 1 {
return 1
}
return score
return score
}
}