Impl' storage i'faces and fix non-idiomatic warts.

This change includes implementation of most major storage layer
features, albeit some imperfect.  It also includes nascent telemetry
bindings, too.
pull/1/head
Matt T. Proud 2012-12-25 13:50:36 +01:00
parent a14dbd5bd0
commit 3ac5d48b1a
12 changed files with 2304 additions and 484 deletions

View File

@ -40,7 +40,7 @@ preparation-stamp: build-dependencies
build-dependencies: build-dependencies-stamp build-dependencies: build-dependencies-stamp
build-dependencies-stamp: bison cc mercurial protoc goprotobuf go leveldb levigo build-dependencies-stamp: bison cc mercurial protoc goprotobuf go instrumentation leveldb levigo
touch $@ touch $@
overlay: overlay-stamp overlay: overlay-stamp
@ -100,6 +100,12 @@ goprotobuf-stamp: go protoc source
$(GO_GET) code.google.com/p/goprotobuf/protoc-gen-go $(GO_GET) code.google.com/p/goprotobuf/protoc-gen-go
touch $@ touch $@
instrumentation: instrumentation-stamp
instrumentation-stamp: go source
$(GO_GET) github.com/matttproud/golang_instrumentation
touch $@
leveldb: leveldb-stamp leveldb: leveldb-stamp
leveldb-stamp: cc rsync leveldb-$(LEVELDB_VERSION).tar.gz overlay leveldb-stamp: cc rsync leveldb-$(LEVELDB_VERSION).tar.gz overlay
@ -148,4 +154,4 @@ clean:
-rm protobuf-$(PROTOCOL_BUFFERS_VERSION).tar.bz2 -rm protobuf-$(PROTOCOL_BUFFERS_VERSION).tar.bz2
.PHONY: all bison build-dependencies cc clean go goprotobuf leveldb levigo mercurial overlay preparation protoc rsync source test wget .PHONY: all bison build-dependencies cc clean go goprotobuf instrumentation leveldb levigo mercurial overlay preparation protoc rsync source test wget

19
main.go
View File

@ -14,6 +14,8 @@
package main package main
import ( import (
"fmt"
"github.com/matttproud/prometheus/retrieval"
"github.com/matttproud/prometheus/storage/metric/leveldb" "github.com/matttproud/prometheus/storage/metric/leveldb"
"log" "log"
"os" "os"
@ -30,6 +32,21 @@ func main() {
m.Close() m.Close()
}() }()
for { t := &retrieval.Target{
Address: "http://localhost:8080/metrics.json",
}
for i := 0; i < 100000; i++ {
c, err := t.Scrape()
if err != nil {
fmt.Println(err)
continue
}
for _, s := range c {
m.AppendSample(&s)
}
fmt.Printf("Finished %d\n", i)
} }
} }

119
retrieval/type.go Normal file
View File

@ -0,0 +1,119 @@
package retrieval
import (
"encoding/json"
"github.com/matttproud/prometheus/model"
"io/ioutil"
"net/http"
"strconv"
"time"
)
type TargetState int
const (
UNKNOWN TargetState = iota
ALIVE
UNREACHABLE
)
type Target struct {
State TargetState
Address string
Staleness time.Duration
Frequency time.Duration
}
func (t *Target) Scrape() (samples []model.Sample, err error) {
defer func() {
if err != nil {
t.State = ALIVE
}
}()
ti := time.Now()
resp, err := http.Get(t.Address)
if err != nil {
return
}
defer resp.Body.Close()
raw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
intermediate := make(map[string]interface{})
err = json.Unmarshal(raw, &intermediate)
if err != nil {
return
}
baseLabels := map[string]string{"instance": t.Address}
for name, v := range intermediate {
asMap, ok := v.(map[string]interface{})
if !ok {
continue
}
switch asMap["type"] {
case "counter":
m := model.Metric{}
m["name"] = model.LabelValue(name)
asFloat, ok := asMap["value"].(float64)
if !ok {
continue
}
s := model.Sample{
Metric: m,
Value: model.SampleValue(asFloat),
Timestamp: ti,
}
for baseK, baseV := range baseLabels {
m[model.LabelName(baseK)] = model.LabelValue(baseV)
}
samples = append(samples, s)
case "histogram":
values, ok := asMap["value"].(map[string]interface{})
if !ok {
continue
}
for p, pValue := range values {
asString, ok := pValue.(string)
if !ok {
continue
}
float, err := strconv.ParseFloat(asString, 64)
if err != nil {
continue
}
m := model.Metric{}
m["name"] = model.LabelValue(name)
m["percentile"] = model.LabelValue(p)
s := model.Sample{
Metric: m,
Value: model.SampleValue(float),
Timestamp: ti,
}
for baseK, baseV := range baseLabels {
m[model.LabelName(baseK)] = model.LabelValue(baseV)
}
samples = append(samples, s)
}
}
}
return
}

View File

@ -19,8 +19,7 @@ import (
) )
type StalenessPolicy struct { type StalenessPolicy struct {
AllowStale bool DeltaAllowance time.Duration
MaximumStaleness time.Duration
} }
// MetricPersistence is a system for storing metric samples in a persistence // MetricPersistence is a system for storing metric samples in a persistence
@ -43,7 +42,6 @@ type MetricPersistence interface {
GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error)
GetFirstValue(*model.Metric) (*model.Sample, error)
GetValueAtTime(*model.Metric, *time.Time, *StalenessPolicy) (*model.Sample, error) GetValueAtTime(*model.Metric, *time.Time, *StalenessPolicy) (*model.Sample, error)
GetBoundaryValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.Sample, *model.Sample, error) GetBoundaryValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.Sample, *model.Sample, error)
GetRangeValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.SampleSet, error) GetRangeValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.SampleSet, error)

View File

@ -17,9 +17,11 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"errors" "errors"
"github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/utility" "github.com/matttproud/prometheus/utility"
"log"
) )
func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) { func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) {
@ -115,3 +117,66 @@ func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
return nil, errors.New("Unknown error encountered when querying metrics.") return nil, errors.New("Unknown error encountered when querying metrics.")
} }
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDTO := model.MetricToDTO(&metric)
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil {
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
predicate := keyIsAtMostOld(interval.NewestInclusive)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
value := &dto.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if fingerprintsEqual(fingerprintDTO, key.Fingerprint) {
// Wart
if predicate(key) {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
break
}
} else {
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
}
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDTOErr
}
panic("unreachable")
}

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/utility/test"
"io/ioutil" "io/ioutil"
"math" "math"
"math/rand" "math/rand"
@ -31,12 +32,7 @@ const (
stochasticMaximumVariance = 64 stochasticMaximumVariance = 64
) )
type tester interface { var testBasicLifecycle func(t test.Tester) = func(t test.Tester) {
Errorf(format string, args ...interface{})
Error(args ...interface{})
}
var testBasicLifecycle func(t tester) = func(t tester) {
temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test")
if temporaryDirectoryErr != nil { if temporaryDirectoryErr != nil {
@ -78,7 +74,7 @@ func BenchmarkBasicLifecycle(b *testing.B) {
} }
} }
var testReadEmpty func(t tester) = func(t tester) { var testReadEmpty func(t test.Tester) = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {
@ -195,7 +191,7 @@ func BenchmarkReadEmpty(b *testing.B) {
} }
} }
var testAppendSampleAsPureSparseAppend = func(t tester) { var testAppendSampleAsPureSparseAppend = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {
@ -241,7 +237,7 @@ func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) {
} }
} }
var testAppendSampleAsSparseAppendWithReads func(t tester) = func(t tester) { var testAppendSampleAsSparseAppendWithReads func(t test.Tester) = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {

View File

@ -15,14 +15,24 @@ package leveldb
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"errors" registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable" "github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"log"
) )
var (
appendSuccessCount = &metrics.CounterMetric{}
appendFailureCount = &metrics.CounterMetric{}
)
func init() {
registry.Register("sample_append_success_count_total", appendSuccessCount)
registry.Register("sample_append_failure_count_total", appendFailureCount)
}
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error { func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
@ -35,146 +45,148 @@ func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.Label
return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded) return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
} }
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
if has, hasError := l.HasLabelPair(labelPair); hasError == nil { has, err := l.HasLabelPair(labelPair)
var fingerprints *dto.FingerprintCollection if err != nil {
if has { return
if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
} else {
return hasError
} }
return errors.New("Unknown error when appending fingerprint to label name and value pair.") var fingerprints *dto.FingerprintCollection
if has {
fingerprints, err = l.getFingerprintsForLabelSet(labelPair)
if err != nil {
return
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
} }
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
labelName := &dto.LabelName{ labelName := &dto.LabelName{
Name: labelPair.Name, Name: labelPair.Name,
} }
if has, hasError := l.HasLabelName(labelName); hasError == nil { has, err := l.HasLabelName(labelName)
var fingerprints *dto.FingerprintCollection if err != nil {
if has { return
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
} else {
return hasError
} }
return errors.New("Unknown error when appending fingerprint to label name and value pair.") var fingerprints *dto.FingerprintCollection
} if has {
fingerprints, err = l.GetLabelNameFingerprints(labelName)
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) error { if err != nil {
if fingerprintDTO, fingerprintDTOError := model.MessageToFingerprintDTO(m); fingerprintDTOError == nil { return
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m)
if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil {
labelCount := len(m.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair {
go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
go func(labelPair *dto.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
appendError := <-labelPairErrors
if appendError != nil {
return appendError
}
}
for i := 0; i < cap(labelNameErrors); i++ {
appendError := <-labelNameErrors
if appendError != nil {
return appendError
}
}
return nil
} else {
return putError
} }
} else { } else {
return fingerprintDTOError fingerprints = &dto.FingerprintCollection{}
} }
return errors.New("Unknown error in appending label pairs to fingerprint.") fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
} }
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) {
fingerprintDTO, err := model.MessageToFingerprintDTO(m)
if err != nil {
return
}
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m)
err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder)
if err != nil {
return
}
labelCount := len(m.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair {
go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
go func(labelPair *dto.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
err = <-labelPairErrors
if err != nil {
return
}
}
for i := 0; i < cap(labelNameErrors); i++ {
err = <-labelNameErrors
if err != nil {
return
}
}
return
}
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error) {
defer func() {
var m *metrics.CounterMetric = appendSuccessCount
if err != nil {
m = appendFailureCount
}
m.Increment()
}()
metricDTO := model.SampleToMetricDTO(sample) metricDTO := model.SampleToMetricDTO(sample)
if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil { indexHas, err := l.hasIndexMetric(metricDTO)
if !indexHas { if err != nil {
if indexPutError := l.indexMetric(metricDTO); indexPutError == nil { return
if appendError := l.appendFingerprints(metricDTO); appendError != nil {
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
return appendError
}
} else {
log.Printf("Could not add metric to membership index: %q\n", indexPutError)
return indexPutError
}
}
} else {
log.Printf("Could not query membership index for metric: %q\n", indexHasError)
return indexHasError
} }
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { if !indexHas {
err = l.indexMetric(metricDTO)
sampleKeyDTO := &dto.SampleKey{ if err != nil {
Fingerprint: fingerprintDTO, return
Timestamp: indexable.EncodeTime(sample.Timestamp),
} }
sampleValueDTO := &dto.SampleValue{ err = l.appendFingerprints(metricDTO)
Value: proto.Float32(float32(sample.Value)), if err != nil {
return
} }
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
log.Printf("Could not append metric sample: %q\n", putError)
return putError
}
} else {
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr)
return fingerprintDTOErr
} }
return nil fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO)
if err != nil {
return
}
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &dto.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
err = l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded)
if err != nil {
return
}
return
} }

View File

@ -16,16 +16,95 @@ package leveldb
import ( import (
"bytes" "bytes"
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"errors" registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable" "github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/storage/metric" "github.com/matttproud/prometheus/storage/metric"
"log"
"time" "time"
) )
var (
getLabelNameFingerprintsSuccessCount = &metrics.CounterMetric{}
getLabelNameFingerprintsFailureCount = &metrics.CounterMetric{}
getFingerprintsForLabelSetSuccessCount = &metrics.CounterMetric{}
getFingerprintsForLabelSetFailureCount = &metrics.CounterMetric{}
getFingerprintsForLabelNameSuccessCount = &metrics.CounterMetric{}
getFingerprintsForLabelNameFailureCount = &metrics.CounterMetric{}
getMetricForFingerprintSuccessCount = &metrics.CounterMetric{}
getMetricForFingerprintFailureCount = &metrics.CounterMetric{}
getBoundaryValuesSuccessCount = &metrics.CounterMetric{}
getBoundaryValuesFailureCount = &metrics.CounterMetric{}
)
func init() {
registry.Register("get_label_name_fingerprints_success_count_total", getLabelNameFingerprintsSuccessCount)
registry.Register("get_label_name_fingerprints_failure_count_total", getLabelNameFingerprintsFailureCount)
registry.Register("get_fingerprints_for_label_set_success_count_total", getFingerprintsForLabelSetSuccessCount)
registry.Register("get_fingerprints_for_label_set_failure_count_total", getFingerprintsForLabelSetFailureCount)
registry.Register("get_fingerprints_for_label_name_success_count_total", getFingerprintsForLabelNameSuccessCount)
registry.Register("get_fingerprints_for_label_name_failure_count_total", getFingerprintsForLabelNameFailureCount)
registry.Register("get_metric_for_fingerprint_success_count_total", getMetricForFingerprintSuccessCount)
registry.Register("get_metric_for_fingerprint_failure_count_total", getMetricForFingerprintFailureCount)
registry.Register("get_boundary_values_success_count_total", getBoundaryValuesSuccessCount)
registry.Register("get_boundary_values_failure_count_total", getBoundaryValuesFailureCount)
}
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
k = &dto.SampleKey{}
err = proto.Unmarshal(i.Key(), k)
return
}
func extractSampleValue(i iterator) (v *dto.SampleValue, err error) {
v = &dto.SampleValue{}
err = proto.Unmarshal(i.Value(), v)
return
}
func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool {
if l == r {
return true
}
if l == nil && r == nil {
return true
}
if r.Signature == l.Signature {
return true
}
if *r.Signature == *l.Signature {
return true
}
return false
}
type sampleKeyPredicate func(k *dto.SampleKey) bool
func keyIsOlderThan(t time.Time) sampleKeyPredicate {
unix := t.Unix()
return func(k *dto.SampleKey) bool {
return indexable.DecodeTime(k.Timestamp).Unix() > unix
}
}
func keyIsAtMostOld(t time.Time) sampleKeyPredicate {
unix := t.Unix()
return func(k *dto.SampleKey) bool {
return indexable.DecodeTime(k.Timestamp).Unix() <= unix
}
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) { func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto) dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Has(dtoKey) return l.metricMembershipIndex.Has(dtoKey)
@ -46,168 +125,162 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (bool, error
return l.labelNameToFingerprints.Has(dtoKey) return l.labelNameToFingerprints.Has(dtoKey)
} }
func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (*dto.FingerprintCollection, error) { func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (c *dto.FingerprintCollection, err error) {
dtoKey := coding.NewProtocolBufferEncoder(p) dtoKey := coding.NewProtocolBufferEncoder(p)
if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil { get, err := l.labelSetToFingerprints.Get(dtoKey)
value := &dto.FingerprintCollection{} if err != nil {
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { return
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
} }
panic("unreachable") c = &dto.FingerprintCollection{}
err = proto.Unmarshal(get, c)
return
} }
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (*dto.FingerprintCollection, error) { // XXX: Delete me and replace with GetFingerprintsForLabelName.
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (c *dto.FingerprintCollection, err error) {
defer func() {
m := getLabelNameFingerprintsSuccessCount
if err != nil {
m = getLabelNameFingerprintsFailureCount
}
m.Increment()
}()
dtoKey := coding.NewProtocolBufferEncoder(n) dtoKey := coding.NewProtocolBufferEncoder(n)
if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil { get, err := l.labelNameToFingerprints.Get(dtoKey)
value := &dto.FingerprintCollection{} if err != nil {
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { return
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
} }
return nil, errors.New("Unknown error while getting label name fingerprints.") c = &dto.FingerprintCollection{}
err = proto.Unmarshal(get, c)
return
} }
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) (fps []*model.Fingerprint, err error) {
metricDTO := model.MetricToDTO(&metric) defer func() {
m := getFingerprintsForLabelSetSuccessCount
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { if err != nil {
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { m = getFingerprintsForLabelSetFailureCount
defer closer.Close()
start := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
value := &dto.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if *fingerprintDTO.Signature == *key.Fingerprint.Signature {
// Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
break
}
} else {
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
} }
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDTOErr
}
panic("unreachable") m.Increment()
} }()
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) { fps = make([]*model.Fingerprint, 0, 0)
emission := make([]*model.Fingerprint, 0, 0)
for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) { for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) {
if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil { f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO))
unmarshaled := &dto.FingerprintCollection{} if err != nil {
if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil { return fps, err
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
} }
}
return emission, nil
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) {
emission := make([]*model.Fingerprint, 0, 0)
if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil {
unmarshaled := &dto.FingerprintCollection{} unmarshaled := &dto.FingerprintCollection{}
err = proto.Unmarshal(f, unmarshaled)
if err = proto.Unmarshal(raw, unmarshaled); err == nil { if err != nil {
for _, m := range unmarshaled.Member { return fps, err
fp := model.Fingerprint(*m.Signature) }
emission = append(emission, &fp)
} for _, m := range unmarshaled.Member {
} else { fp := model.Fingerprint(*m.Signature)
return nil, err fps = append(fps, &fp)
} }
} else {
return nil, err
} }
return emission, nil return
} }
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) (fps []*model.Fingerprint, err error) {
if raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))); err == nil { defer func() {
unmarshaled := &dto.Metric{} m := getFingerprintsForLabelNameSuccessCount
if unmarshalErr := proto.Unmarshal(raw, unmarshaled); unmarshalErr == nil { if err != nil {
m := model.Metric{} m = getFingerprintsForLabelNameFailureCount
for _, v := range unmarshaled.LabelPair {
m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
}
return &m, nil
} else {
return nil, unmarshalErr
} }
} else {
return nil, err m.Increment()
}()
fps = make([]*model.Fingerprint, 0, 0)
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName)))
if err != nil {
return
} }
panic("unreachable") unmarshaled := &dto.FingerprintCollection{}
err = proto.Unmarshal(raw, unmarshaled)
if err != nil {
return
}
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
fps = append(fps, &fp)
}
return
} }
func (l *LevelDBMetricPersistence) GetBoundaryValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.Sample, *model.Sample, error) { func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m *model.Metric, err error) {
panic("not implemented") defer func() {
m := getMetricForFingerprintSuccessCount
if err != nil {
m = getMetricForFingerprintFailureCount
}
m.Increment()
}()
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f)))
if err != nil {
return
}
unmarshaled := &dto.Metric{}
err = proto.Unmarshal(raw, unmarshaled)
if err != nil {
return
}
m = &model.Metric{}
for _, v := range unmarshaled.LabelPair {
(*m)[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
}
return
} }
func (l *LevelDBMetricPersistence) GetFirstValue(m *model.Metric) (*model.Sample, error) { func (l *LevelDBMetricPersistence) GetBoundaryValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (open *model.Sample, end *model.Sample, err error) {
panic("not implemented") defer func() {
m := getBoundaryValuesSuccessCount
if err != nil {
m = getBoundaryValuesFailureCount
}
m.Increment()
}()
// XXX: Maybe we will want to emit incomplete sets?
open, err = l.GetValueAtTime(m, &i.OldestInclusive, s)
if err != nil {
return
} else if open == nil {
return
}
end, err = l.GetValueAtTime(m, &i.NewestInclusive, s)
if err != nil {
return
} else if end == nil {
open = nil
}
return
} }
func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 {
@ -244,15 +317,15 @@ func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err erro
} }
var ( var (
retrievedKey *dto.SampleKey = &dto.SampleKey{} retrievedKey *dto.SampleKey
) )
err = proto.Unmarshal(i.Key(), retrievedKey) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return return
} }
if *retrievedKey.Fingerprint.Signature != *k.Fingerprint.Signature { if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
return return
} }
@ -265,12 +338,12 @@ func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err erro
return return
} }
err = proto.Unmarshal(i.Key(), retrievedKey) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return return
} }
b = *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature b = fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint)
return return
} }
@ -288,16 +361,15 @@ func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) {
} }
var ( var (
retrievedKey *dto.SampleKey = &dto.SampleKey{} retrievedKey *dto.SampleKey
) )
err = proto.Unmarshal(i.Key(), retrievedKey) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return return
} }
signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
if !signaturesEqual {
return return
} }
@ -320,16 +392,15 @@ func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) {
} }
var ( var (
retrievedKey *dto.SampleKey = &dto.SampleKey{} retrievedKey *dto.SampleKey
) )
err = proto.Unmarshal(i.Key(), retrievedKey) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return return
} }
signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
if !signaturesEqual {
return return
} }
@ -366,39 +437,41 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
iterator.Seek(e) iterator.Seek(e)
var (
firstKey *dto.SampleKey = &dto.SampleKey{}
firstValue *dto.SampleValue = nil
)
within, err := isKeyInsideRecordedInterval(k, iterator) within, err := isKeyInsideRecordedInterval(k, iterator)
if err != nil || !within { if err != nil || !within {
return return
} }
for iterator = iterator; iterator.Valid(); iterator.Prev() { var (
err := proto.Unmarshal(iterator.Key(), firstKey) firstKey *dto.SampleKey
firstValue *dto.SampleValue
)
firstKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) {
firstValue, err = extractSampleValue(iterator)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if *firstKey.Fingerprint.Signature == *k.Fingerprint.Signature { foundTimestamp := indexable.DecodeTime(firstKey.Timestamp)
firstValue = &dto.SampleValue{} targetTimestamp := indexable.DecodeTime(k.Timestamp)
err := proto.Unmarshal(iterator.Value(), firstValue)
if err != nil {
return nil, err
}
if indexable.DecodeTime(firstKey.Timestamp).Equal(indexable.DecodeTime(k.Timestamp)) { if foundTimestamp.Equal(targetTimestamp) {
return model.SampleFromDTO(m, t, firstValue), nil return model.SampleFromDTO(m, t, firstValue), nil
}
break
} }
} else {
return
} }
var ( var (
secondKey *dto.SampleKey = &dto.SampleKey{} secondKey *dto.SampleKey
secondValue *dto.SampleValue = nil secondValue *dto.SampleValue
) )
iterator.Next() iterator.Next()
@ -406,30 +479,94 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
return return
} }
err = proto.Unmarshal(iterator.Key(), secondKey) secondKey, err = extractSampleKey(iterator)
if err != nil { if err != nil {
return return
} }
if *secondKey.Fingerprint.Signature == *k.Fingerprint.Signature { if fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) {
secondValue = &dto.SampleValue{} secondValue, err = extractSampleValue(iterator)
err = proto.Unmarshal(iterator.Value(), secondValue)
if err != nil { if err != nil {
return return
} }
} else {
return
} }
firstTime := indexable.DecodeTime(firstKey.Timestamp) firstTime := indexable.DecodeTime(firstKey.Timestamp)
secondTime := indexable.DecodeTime(secondKey.Timestamp) secondTime := indexable.DecodeTime(secondKey.Timestamp)
interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) currentDelta := secondTime.Sub(firstTime)
emission := &dto.SampleValue{
Value: &interpolated, if currentDelta <= s.DeltaAllowance {
interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t)
emission := &dto.SampleValue{
Value: &interpolated,
}
return model.SampleFromDTO(m, t, emission), nil
} }
return model.SampleFromDTO(m, t, emission), nil return
} }
func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.SampleSet, error) { func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (v *model.SampleSet, err error) {
panic("not implemented") d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)
if err != nil {
return
}
k := &dto.SampleKey{
Fingerprint: f,
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e, err := coding.NewProtocolBufferEncoder(k).Encode()
if err != nil {
return
}
iterator, closer, err := l.metricSamples.GetIterator()
if err != nil {
return
}
defer closer.Close()
iterator.Seek(e)
predicate := keyIsOlderThan(i.NewestInclusive)
for ; iterator.Valid(); iterator.Next() {
retrievedKey := &dto.SampleKey{}
retrievedKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if predicate(retrievedKey) {
break
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
break
}
retrievedValue, err := extractSampleValue(iterator)
if err != nil {
return nil, err
}
if v == nil {
v = &model.SampleSet{}
}
v.Values = append(v.Values, model.SamplePair{
Value: model.SampleValue(*retrievedValue.Value),
Timestamp: indexable.DecodeTime(retrievedKey.Timestamp),
})
}
return
} }

File diff suppressed because it is too large Load Diff

View File

@ -43,16 +43,16 @@ func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error {
return l.persistence.Put(key, existenceValue) return l.persistence.Put(key, existenceValue)
} }
func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBMembershipIndex, error) { func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) {
var leveldbPersistence *leveldb.LevelDBPersistence
var persistenceError error
if leveldbPersistence, persistenceError = leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); persistenceError == nil { leveldbPersistence, err := leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded)
leveldbMembershipIndex := &LevelDBMembershipIndex{ if err != nil {
persistence: leveldbPersistence, return
}
return leveldbMembershipIndex, nil
} }
return nil, persistenceError i = &LevelDBMembershipIndex{
persistence: leveldbPersistence,
}
return
} }

View File

@ -36,7 +36,7 @@ type iteratorCloser struct {
storage *levigo.DB storage *levigo.DB
} }
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBPersistence, error) { func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) {
options := levigo.NewOptions() options := levigo.NewOptions()
options.SetCreateIfMissing(true) options.SetCreateIfMissing(true)
options.SetParanoidChecks(true) options.SetParanoidChecks(true)
@ -47,13 +47,16 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded) filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded)
options.SetFilterPolicy(filterPolicy) options.SetFilterPolicy(filterPolicy)
storage, openErr := levigo.Open(storageRoot, options) storage, err := levigo.Open(storageRoot, options)
if err != nil {
return
}
readOptions := levigo.NewReadOptions() readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions() writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(true) writeOptions.SetSync(true)
emission := &LevelDBPersistence{ p = &LevelDBPersistence{
cache: cache, cache: cache,
filterPolicy: filterPolicy, filterPolicy: filterPolicy,
options: options, options: options,
@ -62,13 +65,17 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
writeOptions: writeOptions, writeOptions: writeOptions,
} }
return emission, openErr return
} }
func (l *LevelDBPersistence) Close() error { func (l *LevelDBPersistence) Close() (err error) {
if l.storage != nil { // These are deferred to take advantage of forced closing in case of stack
l.storage.Close() // unwinding due to anomalies.
} defer func() {
if l.storage != nil {
l.storage.Close()
}
}()
defer func() { defer func() {
if l.filterPolicy != nil { if l.filterPolicy != nil {
@ -100,60 +107,57 @@ func (l *LevelDBPersistence) Close() error {
} }
}() }()
return nil return
} }
func (l *LevelDBPersistence) Get(value coding.Encoder) ([]byte, error) { func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) {
if key, keyError := value.Encode(); keyError == nil { key, err := value.Encode()
return l.storage.Get(l.readOptions, key) if err != nil {
} else { return
return nil, keyError
} }
panic("unreachable") return l.storage.Get(l.readOptions, key)
} }
func (l *LevelDBPersistence) Has(value coding.Encoder) (bool, error) { func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) {
if value, getError := l.Get(value); getError != nil { raw, err := l.Get(value)
return false, getError if err != nil {
} else if value == nil { return
return false, nil
} }
return true, nil h = raw != nil
return
} }
func (l *LevelDBPersistence) Drop(value coding.Encoder) error { func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) {
if key, keyError := value.Encode(); keyError == nil { key, err := value.Encode()
if err != nil {
if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil { return
return deleteError
}
} else {
return keyError
} }
return nil err = l.storage.Delete(l.writeOptions, key)
return
} }
func (l *LevelDBPersistence) Put(key, value coding.Encoder) error { func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) {
if keyEncoded, keyError := key.Encode(); keyError == nil { keyEncoded, err := key.Encode()
if valueEncoded, valueError := value.Encode(); valueError == nil { if err != nil {
return
if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil {
return putError
}
} else {
return valueError
}
} else {
return keyError
} }
return nil valueEncoded, err := value.Encode()
if err != nil {
return
}
err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded)
return
} }
func (l *LevelDBPersistence) GetAll() ([]raw.Pair, error) { func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) {
snapshot := l.storage.NewSnapshot() snapshot := l.storage.NewSnapshot()
defer l.storage.ReleaseSnapshot(snapshot) defer l.storage.ReleaseSnapshot(snapshot)
readOptions := levigo.NewReadOptions() readOptions := levigo.NewReadOptions()
@ -164,22 +168,19 @@ func (l *LevelDBPersistence) GetAll() ([]raw.Pair, error) {
defer iterator.Close() defer iterator.Close()
iterator.SeekToFirst() iterator.SeekToFirst()
result := make([]raw.Pair, 0)
for iterator := iterator; iterator.Valid(); iterator.Next() { for iterator := iterator; iterator.Valid(); iterator.Next() {
result = append(result, raw.Pair{Left: iterator.Key(), Right: iterator.Value()}) pairs = append(pairs, raw.Pair{Left: iterator.Key(), Right: iterator.Value()})
err = iterator.GetError()
if err != nil {
return
}
} }
iteratorError := iterator.GetError() return
if iteratorError != nil {
return nil, iteratorError
}
return result, nil
} }
func (i *iteratorCloser) Close() error { func (i *iteratorCloser) Close() (err error) {
defer func() { defer func() {
if i.storage != nil { if i.storage != nil {
if i.snapshot != nil { if i.snapshot != nil {
@ -200,21 +201,21 @@ func (i *iteratorCloser) Close() error {
} }
}() }()
return nil return
} }
func (l *LevelDBPersistence) GetIterator() (*levigo.Iterator, io.Closer, error) { func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) {
snapshot := l.storage.NewSnapshot() snapshot := l.storage.NewSnapshot()
readOptions := levigo.NewReadOptions() readOptions := levigo.NewReadOptions()
readOptions.SetSnapshot(snapshot) readOptions.SetSnapshot(snapshot)
iterator := l.storage.NewIterator(readOptions) i = l.storage.NewIterator(readOptions)
closer := &iteratorCloser{ c = &iteratorCloser{
iterator: iterator, iterator: i,
readOptions: readOptions, readOptions: readOptions,
snapshot: snapshot, snapshot: snapshot,
storage: l.storage, storage: l.storage,
} }
return iterator, closer, nil return
} }

21
utility/test/interface.go Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
type Tester interface {
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
}