Fix a race condition in calculatePersistenceUrgencyScore

pull/1414/head
beorn7 9 years ago
parent 667c221816
commit 4d1f7b49b6

@ -18,6 +18,7 @@ import (
"container/list" "container/list"
"fmt" "fmt"
"math" "math"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -124,6 +125,7 @@ type memorySeriesStorage struct {
numChunksToPersist int64 // The number of chunks waiting for persistence. numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
rushed bool // Whether the storage is in rushed mode. rushed bool // Whether the storage is in rushed mode.
rushedMtx sync.Mutex // Protects entering and exiting rushed mode.
throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
fpLocker *fingerprintLocker fpLocker *fingerprintLocker
@ -1248,6 +1250,9 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
// files should not by synced anymore provided the user has specified the // files should not by synced anymore provided the user has specified the
// adaptive sync strategy. // adaptive sync strategy.
func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
s.rushedMtx.Lock()
defer s.rushedMtx.Unlock()
var ( var (
chunksToPersist = float64(s.getNumChunksToPersist()) chunksToPersist = float64(s.getNumChunksToPersist())
maxChunksToPersist = float64(s.maxChunksToPersist) maxChunksToPersist = float64(s.maxChunksToPersist)

Loading…
Cancel
Save