Cleanups and adds performance regression.

pull/84/head
Matt T. Proud 2013-03-04 11:43:07 -08:00
parent f39b9c3c8e
commit d5380897c3
4 changed files with 73 additions and 74 deletions

View File

@ -99,7 +99,7 @@ func main() {
go func() {
ticker := time.Tick(time.Second)
for i := 0; i < 5; i++ {
for i := 0; i < 120; i++ {
<-ticker
if i%10 == 0 {
fmt.Printf(".")
@ -109,7 +109,7 @@ func main() {
//f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0")
f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0")
v := metric.NewViewRequestBuilder()
v.GetMetricAtTime(f, time.Now().Add(-30*time.Second))
v.GetMetricAtTime(f, time.Now().Add(-120*time.Second))
view, err := ts.MakeView(v, time.Minute)
fmt.Println(view, err)

View File

@ -28,6 +28,7 @@ import (
"io"
"log"
"sort"
"sync"
"time"
)
@ -223,25 +224,25 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}
// Begin the sorting of grouped samples.
sortingSemaphore := make(chan bool, sortConcurrency)
doneSorting := make(chan bool, len(fingerprintToSamples))
var (
sortingSemaphore = make(chan bool, sortConcurrency)
doneSorting = sync.WaitGroup{}
)
for i := 0; i < sortConcurrency; i++ {
sortingSemaphore <- true
}
for _, samples := range fingerprintToSamples {
doneSorting.Add(1)
go func(samples model.Samples) {
<-sortingSemaphore
sort.Sort(samples)
sortingSemaphore <- true
doneSorting <- true
doneSorting.Done()
}(samples)
}
for i := 0; i < len(fingerprintToSamples); i++ {
<-doneSorting
}
doneSorting.Wait()
var (
absentFingerprints = map[model.Fingerprint]model.Samples{}

View File

@ -101,7 +101,7 @@ func (s getMetricRangeOperations) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Sorts getMetricRangeOperation according duration in descending order.
// Sorts getMetricRangeOperation according to duration in descending order.
type rangeDurationSorter struct {
getMetricRangeOperations
}
@ -176,15 +176,13 @@ func collectIntervals(ops ops) (intervals map[time.Duration]getValuesAtIntervalO
intervals = make(map[time.Duration]getValuesAtIntervalOps)
for _, operation := range ops {
intervalOp, ok := operation.(getValuesAtIntervalOp)
if !ok {
continue
switch t := operation.(type) {
case getValuesAtIntervalOp:
operations, _ := intervals[t.interval]
operations = append(operations, t)
intervals[t.interval] = operations
}
operations, _ := intervals[intervalOp.interval]
operations = append(operations, intervalOp)
intervals[intervalOp.interval] = operations
}
for _, operations := range intervals {
@ -197,9 +195,9 @@ func collectIntervals(ops ops) (intervals map[time.Duration]getValuesAtIntervalO
// Selects and returns all operations that are getValuesAlongRangeOp operations.
func collectRanges(ops ops) (ranges getMetricRangeOperations) {
for _, operation := range ops {
op, ok := operation.(getValuesAlongRangeOp)
if ok {
ranges = append(ranges, op)
switch t := operation.(type) {
case getValuesAlongRangeOp:
ranges = append(ranges, t)
}
}
@ -208,6 +206,11 @@ func collectRanges(ops ops) (ranges getMetricRangeOperations) {
return
}
// optimizeForward iteratively scans operations and peeks ahead to subsequent
// ones to find candidates that can either be removed or truncated through
// simplification. For instance, if a range query happens to overlap a get-a-
// value-at-a-certain-point-request, the range query should flatten and subsume
// the other.
func optimizeForward(pending ops) (out ops) {
if len(pending) == 0 {
return
@ -219,79 +222,75 @@ func optimizeForward(pending ops) (out ops) {
pending = pending[1:len(pending)]
if _, ok := firstOperation.(getValuesAtTimeOp); ok {
switch t := firstOperation.(type) {
case getValuesAtTimeOp:
out = ops{firstOperation}
tail := optimizeForward(pending)
return append(out, tail...)
}
// If the last value was a scan at a given frequency along an interval,
// several optimizations may exist.
if operation, ok := firstOperation.(getValuesAtIntervalOp); ok {
case getValuesAtIntervalOp:
// If the last value was a scan at a given frequency along an interval,
// several optimizations may exist.
for _, peekOperation := range pending {
if peekOperation.StartsAt().After(operation.Through()) {
if peekOperation.StartsAt().After(t.Through()) {
break
}
// If the type is not a range request, we can't do anything.
rangeOperation, ok := peekOperation.(getValuesAlongRangeOp)
if !ok {
continue
}
switch next := peekOperation.(type) {
case getValuesAlongRangeOp:
if !next.Through().After(t.Through()) {
var (
before = getValuesAtIntervalOp(t)
after = getValuesAtIntervalOp(t)
)
if !rangeOperation.Through().After(operation.Through()) {
var (
before = getValuesAtIntervalOp(operation)
after = getValuesAtIntervalOp(operation)
)
before.through = next.from
before.through = rangeOperation.from
// Truncate the get value at interval request if a range request cuts
// it off somewhere.
var (
from = next.from
)
// Truncate the get value at interval request if a range request cuts
// it off somewhere.
var (
t = rangeOperation.from
)
for {
from = from.Add(t.interval)
for {
t = t.Add(operation.interval)
if t.After(rangeOperation.through) {
after.from = t
break
if from.After(next.through) {
after.from = from
break
}
}
pending = append(ops{before, after}, pending...)
sort.Sort(pending)
return optimizeForward(pending)
}
pending = append(ops{before, after}, pending...)
sort.Sort(pending)
return optimizeForward(pending)
}
}
}
if operation, ok := firstOperation.(getValuesAlongRangeOp); ok {
case getValuesAlongRangeOp:
for _, peekOperation := range pending {
if peekOperation.StartsAt().After(operation.Through()) {
if peekOperation.StartsAt().After(t.Through()) {
break
}
switch next := peekOperation.(type) {
// All values at a specific time may be elided into the range query.
if _, ok := peekOperation.(getValuesAtTimeOp); ok {
case getValuesAtTimeOp:
pending = pending[1:len(pending)]
continue
}
// Range queries should be concatenated if they overlap.
if rangeOperation, ok := peekOperation.(getValuesAlongRangeOp); ok {
case getValuesAlongRangeOp:
// Range queries should be concatenated if they overlap.
pending = pending[1:len(pending)]
if rangeOperation.Through().After(operation.Through()) {
operation.through = rangeOperation.through
if next.Through().After(t.Through()) {
t.through = next.through
var (
head = ops{operation}
head = ops{t}
tail = pending
)
@ -299,22 +298,20 @@ func optimizeForward(pending ops) (out ops) {
return optimizeForward(pending)
}
}
if intervalOperation, ok := peekOperation.(getValuesAtIntervalOp); ok {
case getValuesAtIntervalOp:
pending = pending[1:len(pending)]
if intervalOperation.through.After(operation.Through()) {
if next.through.After(t.Through()) {
var (
t = intervalOperation.from
t = next.from
)
for {
t = t.Add(intervalOperation.interval)
t = t.Add(next.interval)
if t.After(intervalOperation.through) {
intervalOperation.from = t
if t.After(next.through) {
next.from = t
pending = append(ops{intervalOperation}, pending...)
pending = append(ops{next}, pending...)
return optimizeForward(pending)
}
@ -322,6 +319,7 @@ func optimizeForward(pending ops) (out ops) {
}
}
}
}
// Strictly needed?

View File

@ -28,7 +28,7 @@ var (
leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", true, "Whether LevelDB uses expensive checks (bool).")
)
// LevelDBPersistence is an disk-backed sorted key-value store.
// LevelDBPersistence is a disk-backed sorted key-value store.
type LevelDBPersistence struct {
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy