From 11bb94a7e58cbbcd72a899974b6c1a81c3bc63fd Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 25 Mar 2013 13:01:29 +0100 Subject: [PATCH 1/4] Implement GetAllMetricNames() for memory storage. --- storage/metric/memory.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 92a8f55a2..c1d67083c 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -348,8 +348,20 @@ func (s memorySeriesStorage) Close() (err error) { return } -func (s memorySeriesStorage) GetAllMetricNames() ([]string, error) { - panic("not implemented") +func (s memorySeriesStorage) GetAllMetricNames() (metrics []string, err error) { + metricSet := map[string]bool{} + for _, series := range s.fingerprintToSeries { + if metricName, ok := series.metric["name"]; !ok { + err = fmt.Errorf("Found timeseries without metric name label: %v", series.metric) + } else { + metricSet[string(metricName)] = true + } + } + for metricName := range metricSet { + metrics = append(metrics, metricName) + } + sort.Strings(metrics) + return } func (s memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) { From 42bdf921d1725ea8a362b1223b3d183c11947693 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 25 Mar 2013 13:04:47 +0100 Subject: [PATCH 2/4] Fetch integrated memory/disk data for simple Get* functions. --- storage/metric/tiered.go | 54 +++++++++++-- storage/metric/tiered_test.go | 138 ++++++++++++++++++++++++++++++---- 2 files changed, 169 insertions(+), 23 deletions(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 98e0fa263..28517f572 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -518,17 +518,55 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier return } -func (t *tieredStorage) GetAllMetricNames() ([]string, error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetAllMetricNames() +func (t *tieredStorage) GetAllMetricNames() (metrics []string, err error) { + diskMetrics, err := t.diskStorage.GetAllMetricNames() + if err != nil { + return + } + memoryMetrics, err := t.memoryArena.GetAllMetricNames() + if err != nil { + return + } + + metricSet := map[string]bool{} + for _, metricName := range append(diskMetrics, memoryMetrics...) { + metricSet[metricName] = true + } + for metricName := range metricSet { + metrics = append(metrics, metricName) + } + sort.Strings(metrics) + + return } -func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetFingerprintsForLabelSet(labelSet) +func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { + memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + if err != nil { + return + } + diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + if err != nil { + return + } + fingerprintSet := map[model.Fingerprint]bool{} + for _, fingerprint := range append(memFingerprints, diskFingerprints...) { + fingerprintSet[fingerprint] = true + } + for fingerprint := range fingerprintSet { + fingerprints = append(fingerprints, fingerprint) + } + + return } func (t *tieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetMetricForFingerprint(f) + m, err = t.memoryArena.GetMetricForFingerprint(f) + if err != nil { + return + } + if m == nil { + m, err = t.diskStorage.GetMetricForFingerprint(f) + } + return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 5e6c7572f..e5a8c36d1 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -23,6 +23,32 @@ import ( "time" ) +type testTieredStorageCloser struct { + storage Storage + dirName string +} + +func (t *testTieredStorageCloser) Close() { + t.storage.Close() + os.RemoveAll(t.dirName) +} + +func newTestTieredStorage(t test.Tester) (storage Storage, closer *testTieredStorageCloser) { + tempDir, _ := ioutil.TempDir("", "test_tiered_storage") + storage = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, tempDir) + + if storage == nil { + t.Fatalf("%d. storage == nil") + } + + go storage.Serve() + closer = &testTieredStorageCloser{ + storage: storage, + dirName: tempDir, + } + return +} + func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) { i := model.SampleValue(0) @@ -340,21 +366,8 @@ func testMakeView(t test.Tester) { ) for i, scenario := range scenarios { - var ( - temporary, _ = ioutil.TempDir("", "test_make_view") - tiered = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, temporary) - ) - - if tiered == nil { - t.Fatalf("%d. tiered == nil", i) - } - - go tiered.Serve() - defer tiered.Drain() - - defer func() { - os.RemoveAll(temporary) - }() + tiered, closer := newTestTieredStorage(t) + defer closer.Close() for j, datum := range scenario.data { err := tiered.AppendSample(datum) @@ -417,3 +430,98 @@ func BenchmarkMakeView(b *testing.B) { testMakeView(b) } } + +func TestGetAllMetricNames(t *testing.T) { + type in struct { + metricName string + appendToMemory bool + appendToDisk bool + } + + scenarios := []struct { + in []in + out []string + }{ + { + // Empty case. + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: false, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + }, + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: true, + appendToDisk: false, + }, + { + metricName: "start_time", + appendToMemory: false, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + "start_time", + }, + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: true, + appendToDisk: true, + }, + { + metricName: "start_time", + appendToMemory: true, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + "start_time", + }, + }, + } + + for i, scenario := range scenarios { + tiered, closer := newTestTieredStorage(t) + defer closer.Close() + for j, metric := range scenario.in { + sample := model.Sample{ + Metric: model.Metric{"name": model.LabelValue(metric.metricName)}, + } + if metric.appendToMemory { + if err := tiered.(*tieredStorage).memoryArena.AppendSample(sample); err != nil { + t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) + } + } + if metric.appendToDisk { + if err := tiered.(*tieredStorage).diskStorage.AppendSample(sample); err != nil { + t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) + } + } + } + metricNames, err := tiered.GetAllMetricNames() + if err != nil { + t.Fatalf("%d. Error getting metric names: %s", i, err) + } + if len(metricNames) != len(scenario.out) { + t.Fatalf("%d. Expected metric count %d, got %d", i, len(scenario.out), len(metricNames)) + } + + for j, expected := range scenario.out { + if expected != metricNames[j] { + t.Fatalf("%d.%d. Expected metric %s, got %s", i, j, expected, metricNames[j]) + } + } + } +} From dd67ab115be9879b96ffb4890284cdca93075045 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 26 Mar 2013 11:45:56 +0100 Subject: [PATCH 3/4] Change GetAllMetricNames() to GetAllValuesForLabel(). --- storage/metric/interface.go | 2 +- storage/metric/leveldb.go | 27 ++++++++++++++++----------- storage/metric/memory.go | 16 +++++++--------- storage/metric/tiered.go | 20 ++++++++++---------- storage/metric/tiered_test.go | 6 +++--- web/api/query.go | 4 ++-- 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 40d1a1452..a1823f7e2 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -52,7 +52,7 @@ type MetricPersistence interface { ForEachSample(IteratorsForFingerprintBuilder) (err error) - GetAllMetricNames() ([]string, error) + GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) // Requests the storage stack to build a materialized View of the values // contained therein. diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 5c52eacae..be1841e0b 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -1236,35 +1236,40 @@ func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err err return } -type MetricNamesFilter struct{} +type LabelNameFilter struct { + labelName model.LabelName +} -func (f *MetricNamesFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { +func (f *LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { unmarshaled, ok := key.(*dto.LabelPair) - if ok && *unmarshaled.Name == "name" { + if ok && model.LabelName(*unmarshaled.Name) == f.labelName { return storage.ACCEPT } return storage.SKIP } -type CollectMetricNamesOp struct { - metricNames []string +type CollectLabelValuesOp struct { + labelValues []model.LabelValue } -func (op *CollectMetricNamesOp) Operate(key, value interface{}) (err *storage.OperatorError) { +func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) { unmarshaled := key.(*dto.LabelPair) - op.metricNames = append(op.metricNames, *unmarshaled.Value) + op.labelValues = append(op.labelValues, model.LabelValue(*unmarshaled.Value)) return } -func (l *LevelDBMetricPersistence) GetAllMetricNames() (metricNames []string, err error) { - metricNamesOp := &CollectMetricNamesOp{} +func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + filter := &LabelNameFilter{ + labelName: labelName, + } + labelValuesOp := &CollectLabelValuesOp{} - _, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, &MetricNamesFilter{}, metricNamesOp) + _, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp) if err != nil { return } - metricNames = metricNamesOp.metricNames + values = labelValuesOp.labelValues return } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index c1d67083c..b7372d672 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -348,19 +348,17 @@ func (s memorySeriesStorage) Close() (err error) { return } -func (s memorySeriesStorage) GetAllMetricNames() (metrics []string, err error) { - metricSet := map[string]bool{} +func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + valueSet := map[model.LabelValue]bool{} for _, series := range s.fingerprintToSeries { - if metricName, ok := series.metric["name"]; !ok { - err = fmt.Errorf("Found timeseries without metric name label: %v", series.metric) - } else { - metricSet[string(metricName)] = true + if value, ok := series.metric[labelName]; ok { + valueSet[value] = true } } - for metricName := range metricSet { - metrics = append(metrics, metricName) + for value := range valueSet { + values = append(values, value) } - sort.Strings(metrics) + sort.Sort(values) return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 28517f572..a5212aa57 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -64,7 +64,7 @@ type Storage interface { Close() // MetricPersistence proxy methods. - GetAllMetricNames() ([]string, error) + GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error) GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error) } @@ -518,24 +518,24 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier return } -func (t *tieredStorage) GetAllMetricNames() (metrics []string, err error) { - diskMetrics, err := t.diskStorage.GetAllMetricNames() +func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + diskValues, err := t.diskStorage.GetAllValuesForLabel(labelName) if err != nil { return } - memoryMetrics, err := t.memoryArena.GetAllMetricNames() + memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName) if err != nil { return } - metricSet := map[string]bool{} - for _, metricName := range append(diskMetrics, memoryMetrics...) { - metricSet[metricName] = true + valueSet := map[model.LabelValue]bool{} + for _, value := range append(diskValues, memoryValues...) { + valueSet[value] = true } - for metricName := range metricSet { - metrics = append(metrics, metricName) + for value := range valueSet { + values = append(values, value) } - sort.Strings(metrics) + sort.Sort(values) return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index e5a8c36d1..ab01f7c71 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -431,7 +431,7 @@ func BenchmarkMakeView(b *testing.B) { } } -func TestGetAllMetricNames(t *testing.T) { +func TestGetAllValuesForLabel(t *testing.T) { type in struct { metricName string appendToMemory bool @@ -510,7 +510,7 @@ func TestGetAllMetricNames(t *testing.T) { } } } - metricNames, err := tiered.GetAllMetricNames() + metricNames, err := tiered.GetAllValuesForLabel("name") if err != nil { t.Fatalf("%d. Error getting metric names: %s", i, err) } @@ -519,7 +519,7 @@ func TestGetAllMetricNames(t *testing.T) { } for j, expected := range scenario.out { - if expected != metricNames[j] { + if expected != string(metricNames[j]) { t.Fatalf("%d.%d. Expected metric %s, got %s", i, j, expected, metricNames[j]) } } diff --git a/web/api/query.go b/web/api/query.go index eb9e27dcc..b65f9bd9d 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -86,7 +86,7 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st } func (serv MetricsService) Metrics() string { - metricNames, err := serv.appState.Storage.GetAllMetricNames() + metricNames, err := serv.appState.Storage.GetAllValuesForLabel("name") rb := serv.ResponseBuilder() rb.SetContentType(gorest.Application_Json) if err != nil { @@ -94,7 +94,7 @@ func (serv MetricsService) Metrics() string { rb.SetResponseCode(http.StatusInternalServerError) return err.Error() } - sort.Strings(metricNames) + sort.Sort(metricNames) resultBytes, err := json.Marshal(metricNames) if err != nil { log.Printf("Error marshalling metric names: %v", err) From e09689693223a8d27dba247f5fc9cf5daa1ac9ff Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 26 Mar 2013 14:46:02 +0100 Subject: [PATCH 4/4] PR comment fixups. --- storage/metric/interface.go | 2 ++ storage/metric/leveldb.go | 19 +++++++++++-------- storage/metric/memory.go | 9 ++++----- storage/metric/tiered.go | 14 ++++++++------ storage/metric/tiered_test.go | 2 ++ 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/storage/metric/interface.go b/storage/metric/interface.go index a1823f7e2..8188bac5e 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -44,6 +44,7 @@ type MetricPersistence interface { // name. GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error) + // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error) @@ -52,6 +53,7 @@ type MetricPersistence interface { ForEachSample(IteratorsForFingerprintBuilder) (err error) + // Get all label values that are associated with a given label name. GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) // Requests the storage stack to build a materialized View of the values diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index be1841e0b..654ae978b 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -1221,13 +1221,16 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. type MetricKeyDecoder struct{} func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) { - unmarshaled := &dto.LabelPair{} - err = proto.Unmarshal(in.([]byte), unmarshaled) + unmarshaled := dto.LabelPair{} + err = proto.Unmarshal(in.([]byte), &unmarshaled) if err != nil { return } - out = unmarshaled + out = model.LabelPair{ + Name: model.LabelName(*unmarshaled.Name), + Value: model.LabelValue(*unmarshaled.Value), + } return } @@ -1240,9 +1243,9 @@ type LabelNameFilter struct { labelName model.LabelName } -func (f *LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { - unmarshaled, ok := key.(*dto.LabelPair) - if ok && model.LabelName(*unmarshaled.Name) == f.labelName { +func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { + labelPair, ok := key.(model.LabelPair) + if ok && labelPair.Name == f.labelName { return storage.ACCEPT } return storage.SKIP @@ -1253,8 +1256,8 @@ type CollectLabelValuesOp struct { } func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) { - unmarshaled := key.(*dto.LabelPair) - op.labelValues = append(op.labelValues, model.LabelValue(*unmarshaled.Value)) + labelPair := key.(model.LabelPair) + op.labelValues = append(op.labelValues, model.LabelValue(labelPair.Value)) return } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index b7372d672..7d4c2fb1c 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -352,13 +352,12 @@ func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (va valueSet := map[model.LabelValue]bool{} for _, series := range s.fingerprintToSeries { if value, ok := series.metric[labelName]; ok { - valueSet[value] = true + if !valueSet[value] { + values = append(values, value) + valueSet[value] = true + } } } - for value := range valueSet { - values = append(values, value) - } - sort.Sort(values) return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index a5212aa57..311caef6d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -63,9 +63,12 @@ type Storage interface { Flush() Close() - // MetricPersistence proxy methods. + // Get all label values that are associated with the provided label name. GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) + // Get all of the metric fingerprints that are associated with the provided + // label set. GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error) + // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error) } @@ -530,12 +533,11 @@ func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values valueSet := map[model.LabelValue]bool{} for _, value := range append(diskValues, memoryValues...) { - valueSet[value] = true - } - for value := range valueSet { - values = append(values, value) + if !valueSet[value] { + values = append(values, value) + valueSet[value] = true + } } - sort.Sort(values) return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ab01f7c71..a5e0b9bfa 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/utility/test" "io/ioutil" "os" + "sort" "testing" "time" ) @@ -518,6 +519,7 @@ func TestGetAllValuesForLabel(t *testing.T) { t.Fatalf("%d. Expected metric count %d, got %d", i, len(scenario.out), len(metricNames)) } + sort.Sort(metricNames) for j, expected := range scenario.out { if expected != string(metricNames[j]) { t.Fatalf("%d.%d. Expected metric %s, got %s", i, j, expected, metricNames[j])