From c3e3460ca6e23ef8f9041150591c51b883fcbacd Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 5 Apr 2013 13:07:13 +0200 Subject: [PATCH] Spin up curator run in the tests. After this commit, we'll need to add validations that it does the desired work, which we presently know that it doesn't. Given the changes I made with a plethora of renamings, I want to commit this now before it gets even larger. --- coding/protocol_buffer.go | 13 +++-- storage/metric/.gitignore | 1 + storage/metric/curator.go | 63 ++++++++++++---------- storage/metric/curator_test.go | 79 ++++++++++++++++------------ storage/metric/frontier.go | 4 +- storage/metric/leveldb.go | 32 +++++------ storage/metric/tiered.go | 2 +- storage/raw/index/leveldb/leveldb.go | 2 +- storage/raw/leveldb/test/fixtures.go | 16 +----- 9 files changed, 112 insertions(+), 100 deletions(-) create mode 100644 storage/metric/.gitignore diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 273395207..19c8bef15 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -15,13 +15,14 @@ package coding import ( "code.google.com/p/goprotobuf/proto" + "fmt" ) -type ProtocolBufferEncoder struct { +type ProtocolBuffer struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { +func (p ProtocolBuffer) Encode() (raw []byte, err error) { raw, err = proto.Marshal(p.message) // XXX: Adjust legacy users of this to not check for error. @@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { return } -func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { - return &ProtocolBufferEncoder{ +func (p ProtocolBuffer) String() string { + return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +} + +func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { + return &ProtocolBuffer{ message: message, } } diff --git a/storage/metric/.gitignore b/storage/metric/.gitignore new file mode 100644 index 000000000..3460f0346 --- /dev/null +++ b/storage/metric/.gitignore @@ -0,0 +1 @@ +command-line-arguments.test diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 62918dc43..592f4ae87 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -37,8 +37,9 @@ type curator struct { // watermarks is the on-disk store that is scanned for high watermarks for // given metrics. watermarks raw.Persistence - // cutOff represents the most recent time up to which values will be curated. - cutOff time.Time + // recencyThreshold represents the most recent time up to which values will be + // curated. + recencyThreshold time.Duration // groupingQuantity represents the number of samples below which encountered // samples will be dismembered and reaggregated into larger groups. groupingQuantity uint32 @@ -48,9 +49,9 @@ type curator struct { } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { return curator{ - cutOff: cutOff, + recencyThreshold: recencyThreshold, stop: make(chan bool), samples: samples, curationState: curationState, @@ -60,19 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample } // run facilitates the curation lifecycle. -func (c curator) run() (err error) { - var ( - decoder watermarkDecoder - filter = watermarkFilter{ - stop: c.stop, - curationState: c.curationState, - } - operator = watermarkOperator{ - olderThan: c.cutOff, - groupSize: c.groupingQuantity, - curationState: c.curationState, - } - ) +func (c curator) run(instant time.Time) (err error) { + decoder := watermarkDecoder{} + filter := watermarkFilter{ + stop: c.stop, + curationState: c.curationState, + groupSize: c.groupingQuantity, + recencyThreshold: c.recencyThreshold, + } + operator := watermarkOperator{ + olderThan: instant.Add(-1 * c.recencyThreshold), + groupSize: c.groupingQuantity, + curationState: c.curationState, + } _, err = c.watermarks.ForEach(decoder, filter, operator) @@ -126,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that remark on + // curationState is the table of CurationKey to CurationValues that rema // far along the curation process has gone for a given metric fingerprint. curationState raw.Persistence // stop, when non-empty, instructs the filter to stop operation. stop chan bool + // groupSize refers to the target groupSize from the curator. + groupSize uint32 + // recencyThreshold refers to the target recencyThreshold from the curator. + recencyThreshold time.Duration } func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - curationKey = fingerprint.ToDTO() - rawCurationValue []byte - err error - curationValue = &dto.CurationValue{} - ) + fingerprint := key.(model.Fingerprint) + watermark := value.(model.Watermark) + curationKey := &dto.CurationKey{ + Fingerprint: fingerprint.ToDTO(), + MinimumGroupSize: proto.Uint32(w.groupSize), + OlderThan: proto.Int64(int64(w.recencyThreshold)), + } + curationValue := &dto.CurationValue{} - rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { panic(err) } @@ -229,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er MinimumGroupSize: proto.Uint32(w.groupSize), } - curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) return } @@ -247,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod } ) - rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { return } diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 7757ae4f5..c950ffb42 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -27,10 +27,10 @@ import ( type ( curationState struct { - fingerprint string - groupSize int - olderThan time.Duration - lastCurated time.Time + fingerprint string + groupSize int + recencyThreshold time.Duration + lastCurated time.Time } watermarkState struct { @@ -48,21 +48,23 @@ type ( values []sample } - context struct { - curationStates fixture.Pairs - watermarkStates fixture.Pairs - sampleGroups fixture.Pairs + in struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + recencyThreshold time.Duration + groupSize uint32 } ) func (c curationState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + key = coding.NewProtocolBuffer(&dto.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), - OlderThan: proto.Int64(int64(c.olderThan)), + OlderThan: proto.Int64(int64(c.recencyThreshold)), }) - value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + value = coding.NewProtocolBuffer(&dto.CurationValue{ LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), }) @@ -70,13 +72,13 @@ func (c curationState) Get() (key, value coding.Encoder) { } func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) return } func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + key = coding.NewProtocolBuffer(&dto.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), Timestamp: indexable.EncodeTime(s.values[0].time), LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), @@ -92,7 +94,7 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { }) } - value = coding.NewProtocolBufferEncoder(series) + value = coding.NewProtocolBuffer(series) return } @@ -100,22 +102,31 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { func TestCurator(t *testing.T) { var ( scenarios = []struct { - context context + in in }{ { - context: context{ + in: in{ + recencyThreshold: 1 * time.Hour, + groupSize: 5, curationStates: fixture.Pairs{ curationState{ - fingerprint: "0001-A-1-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 30 * time.Minute), + fingerprint: "0001-A-1-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), }, curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), + fingerprint: "0002-A-2-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 2, + recencyThreshold: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), }, }, watermarkStates: fixture.Pairs{ @@ -124,7 +135,7 @@ func TestCurator(t *testing.T) { lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, watermarkState{ - fingerprint: "0002-A-1-Z", + fingerprint: "0002-A-2-Z", lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, }, @@ -479,26 +490,26 @@ func TestCurator(t *testing.T) { ) for _, scenario := range scenarios { - curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) defer curatorDirectory.Close() - watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) defer watermarkDirectory.Close() - sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer curatorState.Close() + defer curatorStates.Close() - watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer watermarkState.Close() + defer watermarkStates.Close() samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) if err != nil { @@ -506,5 +517,7 @@ func TestCurator(t *testing.T) { } defer samples.Close() + c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) + c.run(testInstant) } } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9c4183902..ecc5c9321 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw, err := coding.NewProtocolBufferEncoder(key).Encode() + raw, err := coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } @@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw, err = coding.NewProtocolBufferEncoder(key).Encode() + raw, err = coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 654ae978b..d1d59f98d 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(fingerprint.ToDTO()) + value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value = &dto.MetricHighWatermark{} raw []byte newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBufferEncoder(key) + keyEncoded = coding.NewProtocolBuffer(key) ) key.Signature = proto.String(fingerprint.ToRowKey()) @@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) mutationCount++ } @@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) if err != nil { return fps, err } @@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { return } @@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T Timestamp: indexable.EncodeTime(t), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } @@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c8dc40d1e..daad234a8 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index e00152d3d..a877a6d87 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -21,7 +21,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 0499b8dac..2cb8d35d9 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -15,7 +15,6 @@ package test import ( "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -64,13 +63,7 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - - var ( - persistence raw.Persistence - err error - ) - - persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) if err != nil { defer t.Close() p.tester.Fatal(err) @@ -83,12 +76,7 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory }() for f.HasNext() { - var ( - key coding.Encoder - value coding.Encoder - ) - - key, value = f.Next() + key, value := f.Next() err = persistence.Put(key, value) if err != nil {