Extract dto.SampleValueSeries into model.Values.

pull/170/head
Matt T. Proud 12 years ago
parent 422003da8e
commit b1a8e51b07

@ -17,6 +17,7 @@ import (
"bytes" "bytes"
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
dto "github.com/prometheus/prometheus/model/generated"
"sort" "sort"
"time" "time"
) )
@ -135,6 +136,17 @@ func (v Values) InsideInterval(t time.Time) (s bool) {
return true return true
} }
func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
for _, value := range dto.Value {
v = append(v, SamplePair{
Timestamp: time.Unix(*value.Timestamp, 0),
Value: SampleValue(*value.Value),
})
}
return
}
type SampleSet struct { type SampleSet struct {
Metric Metric Metric Metric
Values Values Values Values

@ -656,9 +656,14 @@ func extractSampleKey(i leveldb.Iterator) (key model.SampleKey, err error) {
return return
} }
func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) { func extractSampleValues(i leveldb.Iterator) (values model.Values, err error) {
v = &dto.SampleValueSeries{} v := &dto.SampleValueSeries{}
err = proto.Unmarshal(i.Value(), v) err = proto.Unmarshal(i.Value(), v)
if err != nil {
return
}
values = model.NewValuesFromDTO(v)
return return
} }

@ -216,17 +216,11 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
break break
} }
retrievedValue, err := extractSampleValues(iterator) retrievedValues, err := extractSampleValues(iterator)
if err != nil { if err != nil {
return nil, err return nil, err
} }
samples = append(samples, retrievedValues...)
for _, value := range retrievedValue.Value {
samples = append(samples, model.SamplePair{
Value: model.SampleValue(*value.Value),
Timestamp: time.Unix(*value.Timestamp, 0),
})
}
} }
return return
@ -463,15 +457,15 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
samples, err = levelDBGetRangeValues(persistence, fp, interval) samples, err = levelDBGetRangeValues(persistence, fp, interval)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return }
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
} }
default: default:
samples = p.GetRangeValues(fp, interval) samples = p.GetRangeValues(fp, interval)
}
if len(samples) < 2 { if len(samples) < 2 {
t.Errorf("expected sample count less than %d, got %d", 2, len(samples)) t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
return }
} }
} }
} }

@ -473,7 +473,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
Fingerprint: fingerprint.ToDTO(), Fingerprint: fingerprint.ToDTO(),
} }
foundKey model.SampleKey foundKey model.SampleKey
foundValue *dto.SampleValueSeries foundValues model.Values
) )
// Limit the target key to be within the series' keyspace. // Limit the target key to be within the series' keyspace.
@ -510,9 +510,9 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
rewound = true rewound = true
} }
foundValue, err = extractSampleValues(iterator) foundValues, err = extractSampleValues(iterator)
if err != nil { if err != nil {
panic(err) return
} }
// If we rewound, but the target time is still past the current block, return // If we rewound, but the target time is still past the current block, return
@ -520,33 +520,25 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
if rewound { if rewound {
foundKey, err = extractSampleKey(iterator) foundKey, err = extractSampleKey(iterator)
if err != nil { if err != nil {
panic(err) return
} }
currentChunkLastTime := foundKey.LastTimestamp currentChunkLastTime := foundKey.LastTimestamp
if ts.After(currentChunkLastTime) { if ts.After(currentChunkLastTime) {
sampleCount := len(foundValue.Value) sampleCount := len(foundValues)
chunk = append(chunk, model.SamplePair{ chunk = append(chunk, foundValues[sampleCount-1])
Timestamp: time.Unix(*foundValue.Value[sampleCount-1].Timestamp, 0),
Value: model.SampleValue(*foundValue.Value[sampleCount-1].Value),
})
// We know there's a next block since we have rewound from it. // We know there's a next block since we have rewound from it.
iterator.Next() iterator.Next()
foundValue, err = extractSampleValues(iterator) foundValues, err = extractSampleValues(iterator)
if err != nil { if err != nil {
panic(err) return
} }
} }
} }
// Now append all the samples of the currently seeked block to the output. // Now append all the samples of the currently seeked block to the output.
for _, sample := range foundValue.Value { chunk = append(chunk, foundValues...)
chunk = append(chunk, model.SamplePair{
Timestamp: time.Unix(*sample.Timestamp, 0),
Value: model.SampleValue(*sample.Value),
})
}
return return
} }

Loading…
Cancel
Save