Prevent total storage locking during memory flush.

While a hack, this change should allow us to serve queries
expeditiously during a flush operation.

Change-Id: I9a483fd1dd2b0638ab24ace960df08773c4a5079
changes/23/23/3
Matt T. Proud 11 years ago
parent 12d5e6ca5a
commit 7910f6e863

@ -288,13 +288,14 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien
} }
s.RUnlock() s.RUnlock()
s.Lock()
for _, fingerprint := range emptySeries { for _, fingerprint := range emptySeries {
if s.fingerprintToSeries[fingerprint].size() == 0 { if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 {
s.Lock()
s.dropSeries(&fingerprint) s.dropSeries(&fingerprint)
s.Unlock()
} }
} }
s.Unlock()
} }
// Drop all references to a series, including any samples. // Drop all references to a series, including any samples.

@ -93,6 +93,8 @@ type TieredStorage struct {
wmCache *watermarkCache wmCache *watermarkCache
Indexer MetricIndexer Indexer MetricIndexer
flushSema chan bool
} }
// viewJob encapsulates a request to extract sample values from the datastore. // viewJob encapsulates a request to extract sample values from the datastore.
@ -136,6 +138,8 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
memorySemaphore: make(chan bool, tieredMemorySemaphores), memorySemaphore: make(chan bool, tieredMemorySemaphores),
wmCache: wmCache, wmCache: wmCache,
flushSema: make(chan bool, 1),
} }
for i := 0; i < tieredMemorySemaphores; i++ { for i := 0; i < tieredMemorySemaphores; i++ {
@ -235,11 +239,18 @@ func (t *TieredStorage) Serve(started chan<- bool) {
}() }()
started <- true started <- true
for { for {
select { select {
case <-flushMemoryTicker.C: case <-flushMemoryTicker.C:
t.flushMemory(t.memoryTTL) select {
case t.flushSema <- true:
go func() {
t.flushMemory(t.memoryTTL)
<-t.flushSema
}()
default:
glog.Warning("Backlogging on flush...")
}
case viewRequest := <-t.ViewQueue: case viewRequest := <-t.ViewQueue:
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop() viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
<-t.memorySemaphore <-t.memorySemaphore
@ -247,6 +258,7 @@ func (t *TieredStorage) Serve(started chan<- bool) {
case drainingDone := <-t.draining: case drainingDone := <-t.draining:
t.Flush() t.Flush()
drainingDone <- true drainingDone <- true
return return
} }
} }
@ -261,7 +273,9 @@ func (t *TieredStorage) reportQueues() {
} }
func (t *TieredStorage) Flush() { func (t *TieredStorage) Flush() {
t.flushSema <- true
t.flushMemory(0) t.flushMemory(0)
<-t.flushSema
} }
func (t *TieredStorage) flushMemory(ttl time.Duration) { func (t *TieredStorage) flushMemory(ttl time.Duration) {

Loading…
Cancel
Save