From ce1ee444f12ed008900f453634ff2f5f45f5ebfa Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 8 May 2013 15:30:27 +0200 Subject: [PATCH] Synchronous memory appends and more fine-grained storage locks. This does two things: 1) Make TieredStorage.AppendSamples() write directly to memory instead of buffering to a channel first. This is needed in cases where a rule might immediately need the data generated by a previous rule. 2) Replace the single storage mutex by two new ones: - memoryMutex - needs to be locked at any time that two concurrent goroutines could be accessing (via read or write) the TieredStorage memoryArena. - memoryDeleteMutex - used to prevent any deletion of samples from memoryArena as long as renderView is running and assembling data from it. The LevelDB disk storage does not need to be protected by a mutex when rendering a view since renderView works off a LevelDB snapshot. The rationale against adding memoryMutex directly to the memory storage: taking a mutex does come with a small inherent time cost, and taking it is only required in few places. In fact, no locking is required for the memory storage instance which is part of a view (and not the TieredStorage). --- main.go | 2 +- storage/metric/interface.go | 2 +- storage/metric/test_helper.go | 6 +- storage/metric/tiered.go | 109 ++++++++++++++-------------------- storage/metric/tiered_test.go | 2 - 5 files changed, 46 insertions(+), 75 deletions(-) diff --git a/main.go b/main.go index 3f2d522c2..679de45b3 100644 --- a/main.go +++ b/main.go @@ -133,7 +133,7 @@ func main() { log.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) + ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) if err != nil { log.Fatalf("Error opening storage: %s", err) } diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 1e49a5407..a336da50b 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -33,7 +33,7 @@ type MetricPersistence interface { // Record a new sample in the storage layer. AppendSample(model.Sample) error - // Record a new sample in the storage layer. + // Record a group of new samples in the storage layer. AppendSamples(model.Samples) error // Get all of the metric fingerprints that are associated with the provided diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 4ca062195..f272e55a0 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -26,10 +26,6 @@ var ( testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC) ) -const ( - appendQueueSize = 100 -) - func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { err := p.AppendSample(s) if err != nil { @@ -90,7 +86,7 @@ func (t testTieredStorageCloser) Close() { func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) { var directory test.TemporaryDirectory directory = test.NewTemporaryDirectory("test_tiered_storage", t) - storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) + storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) if err != nil { if storage != nil { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 5e848e9b7..bca86f983 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -60,13 +60,17 @@ type TieredStorage struct { DiskStorage *LevelDBMetricPersistence appendToDiskQueue chan model.Samples - appendToMemoryQueue chan model.Samples diskFrontier *diskFrontier draining chan chan bool flushMemoryInterval time.Duration memoryArena memorySeriesStorage memoryTTL time.Duration - mutex sync.Mutex + // This mutex manages any concurrent reads/writes of the memoryArena. + memoryMutex sync.RWMutex + // This mutex blocks only deletions from the memoryArena. It is held for a + // potentially long time for an entire renderView() duration, since we depend + // on no samples being removed from memory after grabbing a LevelDB snapshot. + memoryDeleteMutex sync.RWMutex viewQueue chan viewJob writeMemoryInterval time.Duration } @@ -79,7 +83,7 @@ type viewJob struct { err chan error } -func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { +func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { diskStorage, err := NewLevelDBMetricPersistence(root) if err != nil { return @@ -87,7 +91,6 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu storage = &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), - appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth), DiskStorage: diskStorage, draining: make(chan chan bool), flushMemoryInterval: flushMemoryInterval, @@ -100,12 +103,14 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu } // Enqueues Samples for storage. -func (t *TieredStorage) AppendSamples(s model.Samples) (err error) { +func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { if len(t.draining) > 0 { return fmt.Errorf("Storage is in the process of draining.") } - t.appendToMemoryQueue <- s + t.memoryMutex.Lock() + t.memoryArena.AppendSamples(samples) + t.memoryMutex.Unlock() return } @@ -175,8 +180,6 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { func (t *TieredStorage) Serve() { flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) defer flushMemoryTicker.Stop() - writeMemoryTicker := time.NewTicker(t.writeMemoryInterval) - defer writeMemoryTicker.Stop() reportTicker := time.NewTicker(time.Second) defer reportTicker.Stop() @@ -188,14 +191,12 @@ func (t *TieredStorage) Serve() { for { select { - case <-writeMemoryTicker.C: - t.writeMemory() case <-flushMemoryTicker.C: t.flushMemory() case viewRequest := <-t.viewQueue: t.renderView(viewRequest) case drainingDone := <-t.draining: - t.flush() + t.Flush() drainingDone <- true return } @@ -206,33 +207,12 @@ func (t *TieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue))) queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue))) - queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue))) - queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue))) - queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue))) queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue))) } -func (t *TieredStorage) writeMemory() { - begin := time.Now() - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure}) - }() - - t.mutex.Lock() - defer t.mutex.Unlock() - - pendingLength := len(t.appendToMemoryQueue) - - for i := 0; i < pendingLength; i++ { - t.memoryArena.AppendSamples(<-t.appendToMemoryQueue) - } -} - func (t *TieredStorage) Flush() { - t.flush() + t.flushMemory() } func (t *TieredStorage) Close() { @@ -242,31 +222,23 @@ func (t *TieredStorage) Close() { t.memoryArena.Close() close(t.appendToDiskQueue) - close(t.appendToMemoryQueue) close(t.viewQueue) log.Println("Done.") } -// Write all pending appends. -func (t *TieredStorage) flush() (err error) { - // Trim any old values to reduce iterative write costs. - t.flushMemory() - t.writeMemory() - t.flushMemory() - return -} - type memoryToDiskFlusher struct { - toDiskQueue chan model.Samples - disk MetricPersistence - olderThan time.Time - valuesAccepted int - valuesRejected int + toDiskQueue chan model.Samples + disk MetricPersistence + olderThan time.Time + valuesAccepted int + valuesRejected int + memoryDeleteMutex *sync.RWMutex } type memoryToDiskFlusherVisitor struct { - stream stream - flusher *memoryToDiskFlusher + stream stream + flusher *memoryToDiskFlusher + memoryDeleteMutex *sync.RWMutex } func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) { @@ -311,15 +283,18 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag }, } + f.memoryDeleteMutex.Lock() f.stream.values.Delete(skipListTime(recordTime)) + f.memoryDeleteMutex.Unlock() return } func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) { visitor := memoryToDiskFlusherVisitor{ - stream: stream, - flusher: f, + stream: stream, + flusher: f, + memoryDeleteMutex: f.memoryDeleteMutex, } return visitor, visitor, visitor @@ -338,7 +313,7 @@ func (f memoryToDiskFlusher) Close() { f.Flush() } -// Persist a whole bunch of samples to the datastore. +// Persist a whole bunch of samples from memory to the datastore. func (t *TieredStorage) flushMemory() { begin := time.Now() defer func() { @@ -347,13 +322,14 @@ func (t *TieredStorage) flushMemory() { recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure}) }() - t.mutex.Lock() - defer t.mutex.Unlock() + t.memoryMutex.RLock() + defer t.memoryMutex.RUnlock() flusher := &memoryToDiskFlusher{ - disk: t.DiskStorage, - olderThan: time.Now().Add(-1 * t.memoryTTL), - toDiskQueue: t.appendToDiskQueue, + disk: t.DiskStorage, + olderThan: time.Now().Add(-1 * t.memoryTTL), + toDiskQueue: t.appendToDiskQueue, + memoryDeleteMutex: &t.memoryDeleteMutex, } defer flusher.Close() @@ -372,15 +348,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) { recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) }() - t.mutex.Lock() - defer t.mutex.Unlock() + // No samples may be deleted from memory while rendering a view. + t.memoryDeleteMutex.RLock() + defer t.memoryDeleteMutex.RUnlock() - var ( - scans = viewJob.builder.ScanJobs() - view = newView() - // Get a single iterator that will be used for all data extraction below. - iterator = t.DiskStorage.MetricSamples.NewIterator(true) - ) + scans := viewJob.builder.ScanJobs() + view := newView() + // Get a single iterator that will be used for all data extraction below. + iterator := t.DiskStorage.MetricSamples.NewIterator(true) defer iterator.Close() // Rebuilding of the frontier should happen on a conditional basis if a @@ -410,7 +385,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) { targetTime := *standingOps[0].CurrentTime() currentChunk := chunk{} + t.memoryMutex.RLock() memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) + t.memoryMutex.RUnlock() // If we aimed before the oldest value in memory, load more data from disk. if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { // XXX: For earnest performance gains analagous to the benchmarking we diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index b8cf2ee2b..614549c20 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -347,8 +347,6 @@ func testMakeView(t test.Tester, flushToDisk bool) { if flushToDisk { tiered.Flush() - } else { - tiered.writeMemory() } requestBuilder := NewViewRequestBuilder()