diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go index 08aecb0f3..30b7465cc 100644 --- a/storage/metric/leveldb/reading.go +++ b/storage/metric/leveldb/reading.go @@ -14,7 +14,6 @@ package leveldb import ( - "bytes" "code.google.com/p/goprotobuf/proto" registry "github.com/matttproud/golang_instrumentation" "github.com/matttproud/golang_instrumentation/metrics" @@ -305,111 +304,6 @@ type iterator interface { Value() []byte } -func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err error) { - byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() - if err != nil { - return - } - - i.Seek(byteKey) - if !i.Valid() { - return - } - - var ( - retrievedKey *dto.SampleKey - ) - - retrievedKey, err = extractSampleKey(i) - if err != nil { - return - } - - if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { - return - } - - if bytes.Equal(retrievedKey.Timestamp, k.Timestamp) { - return true, nil - } - - i.Prev() - if !i.Valid() { - return - } - - retrievedKey, err = extractSampleKey(i) - if err != nil { - return - } - - b = fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) - - return -} - -func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) { - byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() - if err != nil { - return - } - - i.Seek(byteKey) - - if !i.Valid() { - i.SeekToFirst() - } - - var ( - retrievedKey *dto.SampleKey - ) - - retrievedKey, err = extractSampleKey(i) - if err != nil { - return - } - - if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { - return - } - - keyTime := indexable.DecodeTime(k.Timestamp) - retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) - - return retrievedTime.Before(keyTime), nil -} - -func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) { - byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() - if err != nil { - return - } - - i.Seek(byteKey) - - if !i.Valid() { - i.SeekToLast() - } - - var ( - retrievedKey *dto.SampleKey - ) - - retrievedKey, err = extractSampleKey(i) - if err != nil { - return - } - - if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { - return - } - - keyTime := indexable.DecodeTime(k.Timestamp) - retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) - - return retrievedTime.After(keyTime), nil -} - func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (sample *model.Sample, err error) { d := model.MetricToDTO(m) @@ -433,13 +327,33 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, if err != nil { return } + defer closer.Close() iterator.Seek(e) - - within, err := isKeyInsideRecordedInterval(k, iterator) - if err != nil || !within { - return + if !iterator.Valid() { + /* + * Two cases for this: + * 1.) Corruption in LevelDB. + * 2.) Key seek after AND outside known range. + * + * Once a LevelDB iterator goes invalid, it cannot be recovered; thusly, + * we need to create a new in order to check if the last value in the + * database is sufficient for our purposes. This is, in all reality, a + * corner case but one that could bring down the system. + */ + iterator, closer, err = l.metricSamples.GetIterator() + if err != nil { + return + } + defer closer.Close() + iterator.SeekToLast() + if !iterator.Valid() { + /* + * For whatever reason, the LevelDB cannot be recovered. + */ + return + } } var ( @@ -452,60 +366,126 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, return } - if fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) { - firstValue, err = extractSampleValue(iterator) - - if err != nil { - return nil, err - } - - foundTimestamp := indexable.DecodeTime(firstKey.Timestamp) - targetTimestamp := indexable.DecodeTime(k.Timestamp) - - if foundTimestamp.Equal(targetTimestamp) { - return model.SampleFromDTO(m, t, firstValue), nil - } - } else { + if !fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) { return } - var ( - secondKey *dto.SampleKey - secondValue *dto.SampleValue - ) + firstTime := indexable.DecodeTime(firstKey.Timestamp) + if t.Before(firstTime) { + iterator.Prev() + if !iterator.Valid() { + /* + * Two cases for this: + * 1.) Corruption in LevelDB. + * 2.) Key seek before AND outside known range. + * + * This is an explicit validation to ensure that if no previous values for + * the series are found, the query aborts. + */ + return + } + + var ( + alternativeKey *dto.SampleKey + alternativeValue *dto.SampleValue + ) + + alternativeKey, err = extractSampleKey(iterator) + if err != nil { + return + } + + if fingerprintsEqual(alternativeKey.Fingerprint, k.Fingerprint) { + /* + * At this point, we found a previous value in the same series in the + * database. LevelDB originally seeked to the subsequent element given + * the key, but we need to consider this adjacency instead. + */ + alternativeTime := indexable.DecodeTime(alternativeKey.Timestamp) + + firstKey = alternativeKey + firstValue = alternativeValue + firstTime = alternativeTime + } + } + + firstDelta := firstTime.Sub(*t) + if firstDelta < 0 { + firstDelta *= -1 + } + if firstDelta > s.DeltaAllowance { + return + } + + firstValue, err = extractSampleValue(iterator) + if err != nil { + return + } + + sample = model.SampleFromDTO(m, t, firstValue) + + if firstDelta == time.Duration(0) { + return + } iterator.Next() if !iterator.Valid() { + /* + * Two cases for this: + * 1.) Corruption in LevelDB. + * 2.) Key seek after AND outside known range. + * + * This means that there are no more values left in the storage; and if this + * point is reached, we know that the one that has been found is within the + * allowed staleness limits. + */ return } + var secondKey *dto.SampleKey + secondKey, err = extractSampleKey(iterator) if err != nil { return } - if fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) { - secondValue, err = extractSampleValue(iterator) - if err != nil { - return - } + if !fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) { + return } else { + /* + * At this point, current entry in the database has the same key as the + * previous. For this reason, the validation logic will expect that the + * distance between the two points shall not exceed the staleness policy + * allowed limit to reduce interpolation errors. + * + * For this reason, the sample is reset in case of other subsequent + * validation behaviors. + */ + sample = nil + } + + secondTime := indexable.DecodeTime(secondKey.Timestamp) + + totalDelta := secondTime.Sub(firstTime) + if totalDelta > s.DeltaAllowance { return } - firstTime := indexable.DecodeTime(firstKey.Timestamp) - secondTime := indexable.DecodeTime(secondKey.Timestamp) - currentDelta := secondTime.Sub(firstTime) + var secondValue *dto.SampleValue - if currentDelta <= s.DeltaAllowance { - interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) - emission := &dto.SampleValue{ - Value: &interpolated, - } - - return model.SampleFromDTO(m, t, emission), nil + secondValue, err = extractSampleValue(iterator) + if err != nil { + return } + interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) + + sampleValue := &dto.SampleValue{ + Value: &interpolated, + } + + sample = model.SampleFromDTO(m, t, sampleValue) + return } diff --git a/storage/metric/leveldb/rule_integration_test.go b/storage/metric/leveldb/rule_integration_test.go index 9e6faed6c..a8db95fb5 100644 --- a/storage/metric/leveldb/rule_integration_test.go +++ b/storage/metric/leveldb/rule_integration_test.go @@ -14,6 +14,7 @@ package leveldb import ( + "fmt" "github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/storage/metric" "github.com/matttproud/prometheus/utility/test" @@ -120,7 +121,7 @@ var testGetValueAtTime = func(t test.Tester) { }, }, { - name: "before with staleness policy", + name: "before within staleness policy", input: input{ year: 1984, month: 3, @@ -129,6 +130,16 @@ var testGetValueAtTime = func(t test.Tester) { staleness: time.Duration(365*24) * time.Hour, }, }, + { + name: "before outside staleness policy", + input: input{ + year: 1984, + month: 3, + day: 29, + hour: 0, + staleness: time.Duration(1) * time.Hour, + }, + }, { name: "after without staleness policy", input: input{ @@ -140,7 +151,7 @@ var testGetValueAtTime = func(t test.Tester) { }, }, { - name: "after with staleness policy", + name: "after within staleness policy", input: input{ year: 1984, month: 3, @@ -148,6 +159,19 @@ var testGetValueAtTime = func(t test.Tester) { hour: 0, staleness: time.Duration(365*24) * time.Hour, }, + output: &output{ + value: 0, + }, + }, + { + name: "after outside staleness policy", + input: input{ + year: 1984, + month: 4, + day: 7, + hour: 0, + staleness: time.Duration(7*24) * time.Hour, + }, }, }, }, @@ -251,6 +275,19 @@ var testGetValueAtTime = func(t test.Tester) { hour: 12, staleness: time.Duration(365*24) * time.Hour, }, + output: &output{ + value: 1, + }, + }, + { + name: "after second without staleness policy", + input: input{ + year: 1985, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(0), + }, }, { name: "middle without staleness policy", @@ -412,7 +449,7 @@ var testGetValueAtTime = func(t test.Tester) { }, }, { - name: "after third with staleness policy", + name: "after third within staleness policy", input: input{ year: 1986, month: 9, @@ -420,6 +457,29 @@ var testGetValueAtTime = func(t test.Tester) { hour: 12, staleness: time.Duration(365*24) * time.Hour, }, + output: &output{ + value: 2, + }, + }, + { + name: "after third outside staleness policy", + input: input{ + year: 1986, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(1*24) * time.Hour, + }, + }, + { + name: "after third without staleness policy", + input: input{ + year: 1986, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(0), + }, }, { name: "first middle without staleness policy", @@ -494,7 +554,8 @@ var testGetValueAtTime = func(t test.Tester) { for i, context := range contexts { // Wrapping in function to enable garbage collection of resources. func() { - temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + name := fmt.Sprintf("test_get_value_at_time_%d", i) + temporaryDirectory, _ := ioutil.TempDir("", name) defer func() { if err := os.RemoveAll(temporaryDirectory); err != nil { @@ -663,19 +724,61 @@ var testGetBoundaryValues = func(t test.Tester) { }, }, { - name: "non-existent interval with staleness policy", + name: "non-existent interval after within staleness policy", input: input{ openYear: 1984, openMonth: 3, - openDay: 30, + openDay: 31, openHour: 0, endYear: 1985, endMonth: 3, endDay: 30, endHour: 0, + staleness: time.Duration(4380) * time.Hour, + }, + }, + { + name: "non-existent interval after without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 31, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "non-existent interval before with staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 29, + endHour: 0, staleness: time.Duration(365*24) * time.Hour, }, }, + { + name: "non-existent interval before without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 29, + endHour: 0, + staleness: time.Duration(0), + }, + }, { name: "on end but not start without staleness policy", input: input{ @@ -856,7 +959,7 @@ var testGetBoundaryValues = func(t test.Tester) { endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(178*24) * time.Hour, + staleness: time.Duration(2190) * time.Hour, }, }, { @@ -884,7 +987,7 @@ var testGetBoundaryValues = func(t test.Tester) { endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(178*24) * time.Hour, + staleness: time.Duration(1) * time.Hour, }, }, { @@ -900,6 +1003,10 @@ var testGetBoundaryValues = func(t test.Tester) { endHour: 6, staleness: time.Duration(356*24) * time.Hour, }, + output: &output{ + open: 0, + end: 1, + }, }, }, }, @@ -908,7 +1015,8 @@ var testGetBoundaryValues = func(t test.Tester) { for i, context := range contexts { // Wrapping in function to enable garbage collection of resources. func() { - temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + name := fmt.Sprintf("test_get_boundary_values_%d", i) + temporaryDirectory, _ := ioutil.TempDir("", name) defer func() { if err := os.RemoveAll(temporaryDirectory); err != nil { @@ -942,7 +1050,7 @@ var testGetBoundaryValues = func(t test.Tester) { input := behavior.input open := time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC) end := time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC) - i := model.Interval{ + interval := model.Interval{ OldestInclusive: open, NewestInclusive: end, } @@ -950,30 +1058,30 @@ var testGetBoundaryValues = func(t test.Tester) { DeltaAllowance: input.staleness, } - openValue, endValue, err := persistence.GetBoundaryValues(&m, &i, &p) + openValue, endValue, err := persistence.GetBoundaryValues(&m, &interval, &p) if err != nil { t.Errorf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) } if behavior.output == nil { if openValue != nil { - t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, openValue) + t.Errorf("%d.%d(%s). Expected open to be nil but got: %q\n", i, j, behavior.name, openValue) } if endValue != nil { - t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, endValue) + t.Errorf("%d.%d(%s). Expected end to be nil but got: %q\n", i, j, behavior.name, endValue) } } else { if openValue == nil { - t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) + t.Errorf("%d.%d(%s). Expected open to be %s but got nil\n", i, j, behavior.name, behavior.output) } if endValue == nil { - t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) + t.Errorf("%d.%d(%s). Expected end to be %s but got nil\n", i, j, behavior.name, behavior.output) } if openValue.Value != behavior.output.open { - t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value) + t.Errorf("%d.%d(%s). Expected open to be %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value) } if endValue.Value != behavior.output.end { - t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value) + t.Errorf("%d.%d(%s). Expected end to be %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value) } } } @@ -1501,7 +1609,8 @@ var testGetRangeValues = func(t test.Tester) { for i, context := range contexts { // Wrapping in function to enable garbage collection of resources. func() { - temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + name := fmt.Sprintf("test_get_range_values_%d", i) + temporaryDirectory, _ := ioutil.TempDir("", name) defer func() { if err := os.RemoveAll(temporaryDirectory); err != nil {