From d5380897c333c98d6426e60d649bec7b061988bc Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 4 Mar 2013 11:43:07 -0800 Subject: [PATCH] Cleanups and adds performance regression. --- main.go | 4 +- storage/metric/leveldb.go | 15 ++-- storage/metric/operation.go | 126 ++++++++++++++++----------------- storage/raw/leveldb/leveldb.go | 2 +- 4 files changed, 73 insertions(+), 74 deletions(-) diff --git a/main.go b/main.go index 50f90c55a..81f2ac1ce 100644 --- a/main.go +++ b/main.go @@ -99,7 +99,7 @@ func main() { go func() { ticker := time.Tick(time.Second) - for i := 0; i < 5; i++ { + for i := 0; i < 120; i++ { <-ticker if i%10 == 0 { fmt.Printf(".") @@ -109,7 +109,7 @@ func main() { //f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0") f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0") v := metric.NewViewRequestBuilder() - v.GetMetricAtTime(f, time.Now().Add(-30*time.Second)) + v.GetMetricAtTime(f, time.Now().Add(-120*time.Second)) view, err := ts.MakeView(v, time.Minute) fmt.Println(view, err) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 6956bbae8..ed39690df 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -28,6 +28,7 @@ import ( "io" "log" "sort" + "sync" "time" ) @@ -223,25 +224,25 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err } // Begin the sorting of grouped samples. - - sortingSemaphore := make(chan bool, sortConcurrency) - doneSorting := make(chan bool, len(fingerprintToSamples)) + var ( + sortingSemaphore = make(chan bool, sortConcurrency) + doneSorting = sync.WaitGroup{} + ) for i := 0; i < sortConcurrency; i++ { sortingSemaphore <- true } for _, samples := range fingerprintToSamples { + doneSorting.Add(1) go func(samples model.Samples) { <-sortingSemaphore sort.Sort(samples) sortingSemaphore <- true - doneSorting <- true + doneSorting.Done() }(samples) } - for i := 0; i < len(fingerprintToSamples); i++ { - <-doneSorting - } + doneSorting.Wait() var ( absentFingerprints = map[model.Fingerprint]model.Samples{} diff --git a/storage/metric/operation.go b/storage/metric/operation.go index b3cb8be91..3179e95a7 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -101,7 +101,7 @@ func (s getMetricRangeOperations) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -// Sorts getMetricRangeOperation according duration in descending order. +// Sorts getMetricRangeOperation according to duration in descending order. type rangeDurationSorter struct { getMetricRangeOperations } @@ -176,15 +176,13 @@ func collectIntervals(ops ops) (intervals map[time.Duration]getValuesAtIntervalO intervals = make(map[time.Duration]getValuesAtIntervalOps) for _, operation := range ops { - intervalOp, ok := operation.(getValuesAtIntervalOp) - if !ok { - continue + switch t := operation.(type) { + case getValuesAtIntervalOp: + operations, _ := intervals[t.interval] + + operations = append(operations, t) + intervals[t.interval] = operations } - - operations, _ := intervals[intervalOp.interval] - - operations = append(operations, intervalOp) - intervals[intervalOp.interval] = operations } for _, operations := range intervals { @@ -197,9 +195,9 @@ func collectIntervals(ops ops) (intervals map[time.Duration]getValuesAtIntervalO // Selects and returns all operations that are getValuesAlongRangeOp operations. func collectRanges(ops ops) (ranges getMetricRangeOperations) { for _, operation := range ops { - op, ok := operation.(getValuesAlongRangeOp) - if ok { - ranges = append(ranges, op) + switch t := operation.(type) { + case getValuesAlongRangeOp: + ranges = append(ranges, t) } } @@ -208,6 +206,11 @@ func collectRanges(ops ops) (ranges getMetricRangeOperations) { return } +// optimizeForward iteratively scans operations and peeks ahead to subsequent +// ones to find candidates that can either be removed or truncated through +// simplification. For instance, if a range query happens to overlap a get-a- +// value-at-a-certain-point-request, the range query should flatten and subsume +// the other. func optimizeForward(pending ops) (out ops) { if len(pending) == 0 { return @@ -219,79 +222,75 @@ func optimizeForward(pending ops) (out ops) { pending = pending[1:len(pending)] - if _, ok := firstOperation.(getValuesAtTimeOp); ok { + switch t := firstOperation.(type) { + case getValuesAtTimeOp: out = ops{firstOperation} tail := optimizeForward(pending) return append(out, tail...) - } - // If the last value was a scan at a given frequency along an interval, - // several optimizations may exist. - if operation, ok := firstOperation.(getValuesAtIntervalOp); ok { + case getValuesAtIntervalOp: + // If the last value was a scan at a given frequency along an interval, + // several optimizations may exist. for _, peekOperation := range pending { - if peekOperation.StartsAt().After(operation.Through()) { + if peekOperation.StartsAt().After(t.Through()) { break } // If the type is not a range request, we can't do anything. - rangeOperation, ok := peekOperation.(getValuesAlongRangeOp) - if !ok { - continue - } + switch next := peekOperation.(type) { + case getValuesAlongRangeOp: + if !next.Through().After(t.Through()) { + var ( + before = getValuesAtIntervalOp(t) + after = getValuesAtIntervalOp(t) + ) - if !rangeOperation.Through().After(operation.Through()) { - var ( - before = getValuesAtIntervalOp(operation) - after = getValuesAtIntervalOp(operation) - ) + before.through = next.from - before.through = rangeOperation.from + // Truncate the get value at interval request if a range request cuts + // it off somewhere. + var ( + from = next.from + ) - // Truncate the get value at interval request if a range request cuts - // it off somewhere. - var ( - t = rangeOperation.from - ) + for { + from = from.Add(t.interval) - for { - t = t.Add(operation.interval) - - if t.After(rangeOperation.through) { - after.from = t - break + if from.After(next.through) { + after.from = from + break + } } + + pending = append(ops{before, after}, pending...) + sort.Sort(pending) + + return optimizeForward(pending) } - - pending = append(ops{before, after}, pending...) - sort.Sort(pending) - - return optimizeForward(pending) } } - } - if operation, ok := firstOperation.(getValuesAlongRangeOp); ok { + case getValuesAlongRangeOp: for _, peekOperation := range pending { - if peekOperation.StartsAt().After(operation.Through()) { + if peekOperation.StartsAt().After(t.Through()) { break } + switch next := peekOperation.(type) { // All values at a specific time may be elided into the range query. - if _, ok := peekOperation.(getValuesAtTimeOp); ok { + case getValuesAtTimeOp: pending = pending[1:len(pending)] continue - } - - // Range queries should be concatenated if they overlap. - if rangeOperation, ok := peekOperation.(getValuesAlongRangeOp); ok { + case getValuesAlongRangeOp: + // Range queries should be concatenated if they overlap. pending = pending[1:len(pending)] - if rangeOperation.Through().After(operation.Through()) { - operation.through = rangeOperation.through + if next.Through().After(t.Through()) { + t.through = next.through var ( - head = ops{operation} + head = ops{t} tail = pending ) @@ -299,22 +298,20 @@ func optimizeForward(pending ops) (out ops) { return optimizeForward(pending) } - } - - if intervalOperation, ok := peekOperation.(getValuesAtIntervalOp); ok { + case getValuesAtIntervalOp: pending = pending[1:len(pending)] - if intervalOperation.through.After(operation.Through()) { + if next.through.After(t.Through()) { var ( - t = intervalOperation.from + t = next.from ) for { - t = t.Add(intervalOperation.interval) + t = t.Add(next.interval) - if t.After(intervalOperation.through) { - intervalOperation.from = t + if t.After(next.through) { + next.from = t - pending = append(ops{intervalOperation}, pending...) + pending = append(ops{next}, pending...) return optimizeForward(pending) } @@ -322,6 +319,7 @@ func optimizeForward(pending ops) (out ops) { } } } + } // Strictly needed? diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index ccf096ea8..0602586ee 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -28,7 +28,7 @@ var ( leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", true, "Whether LevelDB uses expensive checks (bool).") ) -// LevelDBPersistence is an disk-backed sorted key-value store. +// LevelDBPersistence is a disk-backed sorted key-value store. type LevelDBPersistence struct { cache *levigo.Cache filterPolicy *levigo.FilterPolicy