|
|
|
@ -64,7 +64,6 @@ type TieredStorage struct {
|
|
|
|
|
memoryArena memorySeriesStorage
|
|
|
|
|
memoryTTL time.Duration
|
|
|
|
|
flushMemoryInterval time.Duration
|
|
|
|
|
writeMemoryInterval time.Duration
|
|
|
|
|
|
|
|
|
|
// This mutex manages any concurrent reads/writes of the memoryArena.
|
|
|
|
|
memoryMutex sync.RWMutex
|
|
|
|
@ -88,7 +87,7 @@ type viewJob struct {
|
|
|
|
|
err chan error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
|
|
|
|
|
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
|
|
|
|
|
diskStorage, err := NewLevelDBMetricPersistence(root)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
@ -102,7 +101,6 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|
|
|
|
memoryArena: NewMemorySeriesStorage(),
|
|
|
|
|
memoryTTL: memoryTTL,
|
|
|
|
|
viewQueue: make(chan viewJob, viewQueueDepth),
|
|
|
|
|
writeMemoryInterval: writeMemoryInterval,
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -185,8 +183,6 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
|
|
|
|
|
func (t *TieredStorage) Serve() {
|
|
|
|
|
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
|
|
|
|
defer flushMemoryTicker.Stop()
|
|
|
|
|
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
|
|
|
|
|
defer writeMemoryTicker.Stop()
|
|
|
|
|
queueReportTicker := time.NewTicker(time.Second)
|
|
|
|
|
defer queueReportTicker.Stop()
|
|
|
|
|
|
|
|
|
|