diff --git a/storage/local/storage.go b/storage/local/storage.go index 62453d89d..b10708b20 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -18,6 +18,7 @@ import ( "container/list" "fmt" "math" + "sync" "sync/atomic" "time" @@ -124,6 +125,7 @@ type memorySeriesStorage struct { numChunksToPersist int64 // The number of chunks waiting for persistence. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. rushed bool // Whether the storage is in rushed mode. + rushedMtx sync.Mutex // Protects entering and exiting rushed mode. throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker @@ -1248,6 +1250,9 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { // files should not by synced anymore provided the user has specified the // adaptive sync strategy. func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { + s.rushedMtx.Lock() + defer s.rushedMtx.Unlock() + var ( chunksToPersist = float64(s.getNumChunksToPersist()) maxChunksToPersist = float64(s.maxChunksToPersist)