diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 40d1a1452..8188bac5e 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -44,6 +44,7 @@ type MetricPersistence interface { // name. GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error) + // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error) @@ -52,7 +53,8 @@ type MetricPersistence interface { ForEachSample(IteratorsForFingerprintBuilder) (err error) - GetAllMetricNames() ([]string, error) + // Get all label values that are associated with a given label name. + GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) // Requests the storage stack to build a materialized View of the values // contained therein. diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 5c52eacae..654ae978b 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -1221,13 +1221,16 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. type MetricKeyDecoder struct{} func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) { - unmarshaled := &dto.LabelPair{} - err = proto.Unmarshal(in.([]byte), unmarshaled) + unmarshaled := dto.LabelPair{} + err = proto.Unmarshal(in.([]byte), &unmarshaled) if err != nil { return } - out = unmarshaled + out = model.LabelPair{ + Name: model.LabelName(*unmarshaled.Name), + Value: model.LabelValue(*unmarshaled.Value), + } return } @@ -1236,35 +1239,40 @@ func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err err return } -type MetricNamesFilter struct{} +type LabelNameFilter struct { + labelName model.LabelName +} -func (f *MetricNamesFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { - unmarshaled, ok := key.(*dto.LabelPair) - if ok && *unmarshaled.Name == "name" { +func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { + labelPair, ok := key.(model.LabelPair) + if ok && labelPair.Name == f.labelName { return storage.ACCEPT } return storage.SKIP } -type CollectMetricNamesOp struct { - metricNames []string +type CollectLabelValuesOp struct { + labelValues []model.LabelValue } -func (op *CollectMetricNamesOp) Operate(key, value interface{}) (err *storage.OperatorError) { - unmarshaled := key.(*dto.LabelPair) - op.metricNames = append(op.metricNames, *unmarshaled.Value) +func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) { + labelPair := key.(model.LabelPair) + op.labelValues = append(op.labelValues, model.LabelValue(labelPair.Value)) return } -func (l *LevelDBMetricPersistence) GetAllMetricNames() (metricNames []string, err error) { - metricNamesOp := &CollectMetricNamesOp{} +func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + filter := &LabelNameFilter{ + labelName: labelName, + } + labelValuesOp := &CollectLabelValuesOp{} - _, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, &MetricNamesFilter{}, metricNamesOp) + _, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp) if err != nil { return } - metricNames = metricNamesOp.metricNames + values = labelValuesOp.labelValues return } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 92a8f55a2..7d4c2fb1c 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -348,8 +348,17 @@ func (s memorySeriesStorage) Close() (err error) { return } -func (s memorySeriesStorage) GetAllMetricNames() ([]string, error) { - panic("not implemented") +func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + valueSet := map[model.LabelValue]bool{} + for _, series := range s.fingerprintToSeries { + if value, ok := series.metric[labelName]; ok { + if !valueSet[value] { + values = append(values, value) + valueSet[value] = true + } + } + } + return } func (s memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 98e0fa263..311caef6d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -63,9 +63,12 @@ type Storage interface { Flush() Close() - // MetricPersistence proxy methods. - GetAllMetricNames() ([]string, error) + // Get all label values that are associated with the provided label name. + GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) + // Get all of the metric fingerprints that are associated with the provided + // label set. GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error) + // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error) } @@ -518,17 +521,54 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier return } -func (t *tieredStorage) GetAllMetricNames() ([]string, error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetAllMetricNames() +func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { + diskValues, err := t.diskStorage.GetAllValuesForLabel(labelName) + if err != nil { + return + } + memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName) + if err != nil { + return + } + + valueSet := map[model.LabelValue]bool{} + for _, value := range append(diskValues, memoryValues...) { + if !valueSet[value] { + values = append(values, value) + valueSet[value] = true + } + } + + return } -func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetFingerprintsForLabelSet(labelSet) +func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { + memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + if err != nil { + return + } + diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + if err != nil { + return + } + fingerprintSet := map[model.Fingerprint]bool{} + for _, fingerprint := range append(memFingerprints, diskFingerprints...) { + fingerprintSet[fingerprint] = true + } + for fingerprint := range fingerprintSet { + fingerprints = append(fingerprints, fingerprint) + } + + return } func (t *tieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) { - // TODO: handle memory persistence as well. - return t.diskStorage.GetMetricForFingerprint(f) + m, err = t.memoryArena.GetMetricForFingerprint(f) + if err != nil { + return + } + if m == nil { + m, err = t.diskStorage.GetMetricForFingerprint(f) + } + return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 5e6c7572f..a5e0b9bfa 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -19,10 +19,37 @@ import ( "github.com/prometheus/prometheus/utility/test" "io/ioutil" "os" + "sort" "testing" "time" ) +type testTieredStorageCloser struct { + storage Storage + dirName string +} + +func (t *testTieredStorageCloser) Close() { + t.storage.Close() + os.RemoveAll(t.dirName) +} + +func newTestTieredStorage(t test.Tester) (storage Storage, closer *testTieredStorageCloser) { + tempDir, _ := ioutil.TempDir("", "test_tiered_storage") + storage = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, tempDir) + + if storage == nil { + t.Fatalf("%d. storage == nil") + } + + go storage.Serve() + closer = &testTieredStorageCloser{ + storage: storage, + dirName: tempDir, + } + return +} + func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) { i := model.SampleValue(0) @@ -340,21 +367,8 @@ func testMakeView(t test.Tester) { ) for i, scenario := range scenarios { - var ( - temporary, _ = ioutil.TempDir("", "test_make_view") - tiered = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, temporary) - ) - - if tiered == nil { - t.Fatalf("%d. tiered == nil", i) - } - - go tiered.Serve() - defer tiered.Drain() - - defer func() { - os.RemoveAll(temporary) - }() + tiered, closer := newTestTieredStorage(t) + defer closer.Close() for j, datum := range scenario.data { err := tiered.AppendSample(datum) @@ -417,3 +431,99 @@ func BenchmarkMakeView(b *testing.B) { testMakeView(b) } } + +func TestGetAllValuesForLabel(t *testing.T) { + type in struct { + metricName string + appendToMemory bool + appendToDisk bool + } + + scenarios := []struct { + in []in + out []string + }{ + { + // Empty case. + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: false, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + }, + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: true, + appendToDisk: false, + }, + { + metricName: "start_time", + appendToMemory: false, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + "start_time", + }, + }, { + in: []in{ + { + metricName: "request_count", + appendToMemory: true, + appendToDisk: true, + }, + { + metricName: "start_time", + appendToMemory: true, + appendToDisk: true, + }, + }, + out: []string{ + "request_count", + "start_time", + }, + }, + } + + for i, scenario := range scenarios { + tiered, closer := newTestTieredStorage(t) + defer closer.Close() + for j, metric := range scenario.in { + sample := model.Sample{ + Metric: model.Metric{"name": model.LabelValue(metric.metricName)}, + } + if metric.appendToMemory { + if err := tiered.(*tieredStorage).memoryArena.AppendSample(sample); err != nil { + t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) + } + } + if metric.appendToDisk { + if err := tiered.(*tieredStorage).diskStorage.AppendSample(sample); err != nil { + t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) + } + } + } + metricNames, err := tiered.GetAllValuesForLabel("name") + if err != nil { + t.Fatalf("%d. Error getting metric names: %s", i, err) + } + if len(metricNames) != len(scenario.out) { + t.Fatalf("%d. Expected metric count %d, got %d", i, len(scenario.out), len(metricNames)) + } + + sort.Sort(metricNames) + for j, expected := range scenario.out { + if expected != string(metricNames[j]) { + t.Fatalf("%d.%d. Expected metric %s, got %s", i, j, expected, metricNames[j]) + } + } + } +} diff --git a/web/api/query.go b/web/api/query.go index eb9e27dcc..b65f9bd9d 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -86,7 +86,7 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st } func (serv MetricsService) Metrics() string { - metricNames, err := serv.appState.Storage.GetAllMetricNames() + metricNames, err := serv.appState.Storage.GetAllValuesForLabel("name") rb := serv.ResponseBuilder() rb.SetContentType(gorest.Application_Json) if err != nil { @@ -94,7 +94,7 @@ func (serv MetricsService) Metrics() string { rb.SetResponseCode(http.StatusInternalServerError) return err.Error() } - sort.Strings(metricNames) + sort.Sort(metricNames) resultBytes, err := json.Marshal(metricNames) if err != nil { log.Printf("Error marshalling metric names: %v", err)