diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 564cd89d4..5f458f50f 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -64,7 +64,7 @@ type Curator struct { // forward until the stop point or end of the series is reached. type watermarkScanner struct { // curationState is the data store for curation remarks. - curationState raw.Persistence + curationState CurationRemarker // diskFrontier models the available seekable ranges for the provided // sampleIterator. diskFrontier *diskFrontier @@ -92,7 +92,7 @@ type watermarkScanner struct { // curated. // curationState is the on-disk store where the curation remarks are made for // how much progress has been made. -func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) { +func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) { defer func(t time.Time) { duration := float64(time.Since(t) / time.Millisecond) @@ -189,26 +189,6 @@ func (w *watermarkScanner) shouldStop() bool { return len(w.stop) != 0 } -func (w *watermarkScanner) getCurationRemark(k *curationKey) (r *curationRemark, found bool, err error) { - curationKey := new(dto.CurationKey) - curationValue := new(dto.CurationValue) - - k.dump(curationKey) - - present, err := w.curationState.Get(curationKey, curationValue) - if err != nil { - return nil, false, err - } - if !present { - return nil, false, nil - } - - remark := new(curationRemark) - remark.load(curationValue) - - return remark, true, nil -} - func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) { fingerprint := key.(*clientmodel.Fingerprint) @@ -244,18 +224,18 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul IgnoreYoungerThan: w.ignoreYoungerThan, } - curationRemark, present, err := w.getCurationRemark(k) + curationRemark, present, err := w.curationState.Get(k) if err != nil { return } if !present { return storage.ACCEPT } - if !curationRemark.OlderThan(w.stopAt) { + if !curationRemark.Before(w.stopAt) { return storage.SKIP } watermark := value.(*watermarks) - if !curationRemark.OlderThan(watermark.High) { + if !curationRemark.Before(watermark.High) { return storage.SKIP } curationConsistent, err := w.curationConsistent(fingerprint, watermark) @@ -278,14 +258,14 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, } - curationRemark, present, err := w.getCurationRemark(k) + curationRemark, present, err := w.curationState.Get(k) if err != nil { return false, err } if !present { return false, nil } - if !curationRemark.OlderThan(watermark.High) { + if !curationRemark.Before(watermark.High) { return true, nil } @@ -303,14 +283,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr return &storage.OperatorError{error: err, Continuable: false} } - k := &curationKey{ + curationState, present, err := w.curationState.Get(&curationKey{ Fingerprint: fingerprint, ProcessorMessageRaw: w.processor.Signature(), ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, - } + }) - curationState, _, err := w.getCurationRemark(k) if err != nil { // An anomaly with the curation remark is likely not fatal in the sense that // there was a decoding error with the entity and shouldn't be cause to stop @@ -318,10 +297,19 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr // work forward. With an idempotent processor, this is safe. return &storage.OperatorError{error: err, Continuable: true} } + var firstSeek time.Time + switch { + case !present, seriesFrontier.After(curationState): + firstSeek = seriesFrontier.firstSupertime + case !seriesFrontier.InSafeSeekRange(curationState): + firstSeek = seriesFrontier.lastSupertime + default: + firstSeek = curationState + } startKey := &SampleKey{ Fingerprint: fingerprint, - FirstTimestamp: seriesFrontier.optimalStartTime(curationState), + FirstTimestamp: firstSeek, } dto := new(dto.SampleKey) @@ -345,7 +333,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr return &storage.OperatorError{error: err, Continuable: false} } - err = w.refreshCurationRemark(fingerprint, lastTime) + err = w.curationState.Update(&curationKey{ + Fingerprint: fingerprint, + ProcessorMessageRaw: w.processor.Signature(), + ProcessorMessageTypeName: w.processor.Name(), + IgnoreYoungerThan: w.ignoreYoungerThan, + }, + lastTime) if err != nil { // Under the assumption that the processors are idempotent, they can be // re-run; thusly, the commitment of the curation remark is no cause @@ -353,56 +347,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr return &storage.OperatorError{error: err, Continuable: true} } - return -} - -func (w *watermarkScanner) refreshCurationRemark(f *clientmodel.Fingerprint, finished time.Time) error { - curationKey := curationKey{ - Fingerprint: f, - ProcessorMessageRaw: w.processor.Signature(), - ProcessorMessageTypeName: w.processor.Name(), - IgnoreYoungerThan: w.ignoreYoungerThan, - } - k := new(dto.CurationKey) - curationKey.dump(k) - curationValue := curationRemark{ - LastCompletionTimestamp: finished, - } - v := new(dto.CurationValue) - curationValue.dump(v) - - return w.curationState.Put(k, v) -} - -// curationRemark provides a representation of dto.CurationValue with associated -// business logic methods attached to it to enhance code readability. -type curationRemark struct { - LastCompletionTimestamp time.Time -} - -// OlderThan answers whether this curationRemark is older than the provided -// cutOff time. -func (c *curationRemark) OlderThan(t time.Time) bool { - return c.LastCompletionTimestamp.Before(t) -} - -// Equal answers whether the two curationRemarks are equivalent. -func (c *curationRemark) Equal(o curationRemark) bool { - return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp) -} - -func (c *curationRemark) String() string { - return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp) -} - -func (c *curationRemark) load(d *dto.CurationValue) { - c.LastCompletionTimestamp = time.Unix(d.GetLastCompletionTimestamp(), 0).UTC() -} - -func (c *curationRemark) dump(d *dto.CurationValue) { - d.Reset() - - d.LastCompletionTimestamp = proto.Int64(c.LastCompletionTimestamp.Unix()) + return nil } // curationKey provides a representation of dto.CurationKey with associated diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 1145d4af3..c2194bbce 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -194,19 +194,3 @@ func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) { func (s *seriesFrontier) After(t time.Time) bool { return s.firstSupertime.After(t) } - -// optimalStartTime indicates what the best start time for a curation operation -// should be given the curation remark. -func (s *seriesFrontier) optimalStartTime(remark *curationRemark) (t time.Time) { - switch { - case remark == nil: - t = s.firstSupertime - case s.After(remark.LastCompletionTimestamp): - t = s.firstSupertime - case !s.InSafeSeekRange(remark.LastCompletionTimestamp): - t = s.lastSupertime - default: - t = remark.LastCompletionTimestamp - } - return -} diff --git a/storage/metric/index.go b/storage/metric/index.go index 9ed8b8412..89f5d9734 100644 --- a/storage/metric/index.go +++ b/storage/metric/index.go @@ -14,7 +14,6 @@ package metric import ( - "io" "sort" "code.google.com/p/goprotobuf/proto" @@ -31,7 +30,6 @@ import ( type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric type FingerprintMetricIndex interface { - io.Closer raw.Pruner IndexBatch(FingerprintMetricMapping) error @@ -40,7 +38,7 @@ type FingerprintMetricIndex interface { Size() (s uint64, present bool, err error) } -type LeveldbFingerprintMetricIndex struct { +type LevelDBFingerprintMetricIndex struct { p *leveldb.LevelDBPersistence } @@ -48,22 +46,20 @@ type LevelDBFingerprintMetricIndexOptions struct { leveldb.LevelDBOptions } -func (i *LeveldbFingerprintMetricIndex) Close() error { +func (i *LevelDBFingerprintMetricIndex) Close() { i.p.Close() - - return nil } -func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState { +func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState { return i.p.State() } -func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) { - s, err := i.p.ApproximateSize() +func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) { + s, err := i.p.Size() return s, true, err } -func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { +func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { b := leveldb.NewBatch() defer b.Close() @@ -79,7 +75,7 @@ func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapp return i.p.Commit(b) } -func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { +func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { k := new(dto.Fingerprint) dumpFingerprint(k, f) v := new(dto.Metric) @@ -98,19 +94,19 @@ func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl return m, true, nil } -func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) { +func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) { i.p.Prune() return false, nil } -func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) { +func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { return nil, err } - return &LeveldbFingerprintMetricIndex{ + return &LevelDBFingerprintMetricIndex{ p: s, }, nil } @@ -118,7 +114,6 @@ func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) ( type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints type LabelNameFingerprintIndex interface { - io.Closer raw.Pruner IndexBatch(LabelNameFingerprintMapping) error @@ -128,11 +123,11 @@ type LabelNameFingerprintIndex interface { Size() (s uint64, present bool, err error) } -type LeveldbLabelNameFingerprintIndex struct { +type LevelDBLabelNameFingerprintIndex struct { p *leveldb.LevelDBPersistence } -func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { +func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -155,7 +150,7 @@ func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp return i.p.Commit(batch) } -func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { +func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { k := new(dto.LabelName) dumpLabelName(k, l) v := new(dto.FingerprintCollection) @@ -176,30 +171,28 @@ func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps return fps, true, nil } -func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { +func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { return i.p.Has(&dto.LabelName{ Name: proto.String(string(l)), }) } -func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) { +func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) { i.p.Prune() return false, nil } -func (i *LeveldbLabelNameFingerprintIndex) Close() error { +func (i *LevelDBLabelNameFingerprintIndex) Close() { i.p.Close() - - return nil } -func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) { - s, err := i.p.ApproximateSize() +func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) { + s, err := i.p.Size() return s, true, err } -func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState { +func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState { return i.p.State() } @@ -207,13 +200,13 @@ type LevelDBLabelNameFingerprintIndexOptions struct { leveldb.LevelDBOptions } -func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) { +func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { return nil, err } - return &LeveldbLabelNameFingerprintIndex{ + return &LevelDBLabelNameFingerprintIndex{ p: s, }, nil } @@ -221,7 +214,6 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints type LabelSetFingerprintIndex interface { - io.Closer raw.ForEacher raw.Pruner @@ -232,7 +224,7 @@ type LabelSetFingerprintIndex interface { Size() (s uint64, present bool, err error) } -type LeveldbLabelSetFingerprintIndex struct { +type LevelDBLabelSetFingerprintIndex struct { p *leveldb.LevelDBPersistence } @@ -240,7 +232,7 @@ type LevelDBLabelSetFingerprintIndexOptions struct { leveldb.LevelDBOptions } -func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error { +func (i *LevelDBLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -264,7 +256,7 @@ func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMappin return i.p.Commit(batch) } -func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { +func (i *LevelDBLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { k := &dto.LabelPair{ Name: proto.String(string(p.Name)), Value: proto.String(string(p.Value)), @@ -289,7 +281,7 @@ func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fi return m, true, nil } -func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { +func (i *LevelDBLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { k := &dto.LabelPair{ Name: proto.String(string(p.Name)), Value: proto.String(string(p.Value)), @@ -298,43 +290,40 @@ func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) return i.p.Has(k) } -func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { +func (i *LevelDBLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { return i.p.ForEach(d, f, o) } -func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) { +func (i *LevelDBLabelSetFingerprintIndex) Prune() (bool, error) { i.p.Prune() return false, nil } -func (i *LeveldbLabelSetFingerprintIndex) Close() error { +func (i *LevelDBLabelSetFingerprintIndex) Close() { i.p.Close() - - return nil } -func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) { - s, err := i.p.ApproximateSize() +func (i *LevelDBLabelSetFingerprintIndex) Size() (uint64, bool, error) { + s, err := i.p.Size() return s, true, err } -func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState { +func (i *LevelDBLabelSetFingerprintIndex) State() *raw.DatabaseState { return i.p.State() } -func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) { +func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelSetFingerprintIndex, error) { s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { return nil, err } - return &LeveldbLabelSetFingerprintIndex{ + return &LevelDBLabelSetFingerprintIndex{ p: s, }, nil } type MetricMembershipIndex interface { - io.Closer raw.Pruner IndexBatch([]clientmodel.Metric) error @@ -343,13 +332,13 @@ type MetricMembershipIndex interface { Size() (s uint64, present bool, err error) } -type LeveldbMetricMembershipIndex struct { +type LevelDBMetricMembershipIndex struct { p *leveldb.LevelDBPersistence } var existenceIdentity = new(dto.MembershipIndexValue) -func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error { +func (i *LevelDBMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error { batch := leveldb.NewBatch() defer batch.Close() @@ -362,29 +351,27 @@ func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error return i.p.Commit(batch) } -func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { +func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { k := new(dto.Metric) dumpMetric(k, m) return i.p.Has(k) } -func (i *LeveldbMetricMembershipIndex) Close() error { +func (i *LevelDBMetricMembershipIndex) Close() { i.p.Close() - - return nil } -func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) { - s, err := i.p.ApproximateSize() +func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) { + s, err := i.p.Size() return s, true, err } -func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState { +func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState { return i.p.State() } -func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) { +func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) { i.p.Prune() return false, nil @@ -394,13 +381,13 @@ type LevelDBMetricMembershipIndexOptions struct { leveldb.LevelDBOptions } -func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) { +func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { return nil, err } - return &LeveldbMetricMembershipIndex{ + return &LevelDBMetricMembershipIndex{ p: s, }, nil } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 6b251ba3b..6571f8aaf 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -36,7 +36,7 @@ import ( const sortConcurrency = 2 type LevelDBMetricPersistence struct { - CurationRemarks *leveldb.LevelDBPersistence + CurationRemarks CurationRemarker fingerprintToMetrics FingerprintMetricIndex labelNameToFingerprints LabelNameFingerprintIndex labelSetToFingerprints LabelSetFingerprintIndex @@ -202,13 +202,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Sample Curation Remarks", func() { var err error - o := &leveldb.LevelDBOptions{ - Name: "Sample Curation Remarks", - Purpose: "Ledger of Progress for Various Curators", - Path: baseDirectory + "/curation_remarks", - CacheSizeBytes: *curationRemarksCacheSize, - } - emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o) + emission.CurationRemarks, err = NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "Sample Curation Remarks", + Purpose: "Ledger of Progress for Various Curators", + Path: baseDirectory + "/curation_remarks", + CacheSizeBytes: *curationRemarksCacheSize, + }, + }) workers.MayFail(err) }, }, @@ -764,7 +765,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La return } -// CompactKeyspace compacts each database's keyspace serially. +// Prune compacts each database's keyspace serially. // // Beware that it would probably be imprudent to run this on a live user-facing // server due to latency implications. @@ -778,10 +779,10 @@ func (l *LevelDBMetricPersistence) Prune() { l.MetricSamples.Prune() } -func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) { +func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) { size := uint64(0) - if size, err = l.CurationRemarks.ApproximateSize(); err != nil { + if size, _, err = l.CurationRemarks.Size(); err != nil { return 0, err } total += size @@ -811,7 +812,7 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) } total += size - if size, err = l.MetricSamples.ApproximateSize(); err != nil { + if size, err = l.MetricSamples.Size(); err != nil { return 0, err } total += size diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 83ff8fc7e..1b5921a78 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -72,13 +72,10 @@ func (c curationState) Get() (key, value proto.Message) { k := &dto.CurationKey{} keyRaw.dump(k) - key = k - valueRaw := curationRemark{ - LastCompletionTimestamp: c.lastCurated, + v := &dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), } - v := &dto.CurationValue{} - valueRaw.dump(v) return k, v } @@ -848,8 +845,10 @@ func TestCuratorCompactionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ - Path: curatorDirectory.Path(), + curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Path: curatorDirectory.Path(), + }, }) if err != nil { t.Fatal(err) @@ -888,7 +887,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { t.Fatal(err) } - iterator := curatorStates.NewIterator(true) + iterator := curatorStates.p.NewIterator(true) defer iterator.Close() for j, expected := range scenario.out.curationStates { @@ -904,38 +903,35 @@ func TestCuratorCompactionProcessor(t *testing.T) { } curationKeyDto := &dto.CurationKey{} - curationValueDto := &dto.CurationValue{} err = proto.Unmarshal(iterator.Key(), curationKeyDto) if err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } - err = proto.Unmarshal(iterator.Value(), curationValueDto) - if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) - } - actualKey := &curationKey{} + actualKey := new(curationKey) actualKey.load(curationKeyDto) - actualCurationRemark := &curationRemark{} - actualCurationRemark.load(curationValueDto) - signature := expected.processor.Signature() + + actualValue, present, err := curatorStates.Get(actualKey) + if !present { + t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey) + } + if err != nil { + t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err) + } expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint.LoadFromString(expected.fingerprint) expectedKey := &curationKey{ Fingerprint: expectedFingerprint, IgnoreYoungerThan: expected.ignoreYoungerThan, - ProcessorMessageRaw: signature, + ProcessorMessageRaw: expected.processor.Signature(), ProcessorMessageTypeName: expected.processor.Name(), } if !actualKey.Equal(expectedKey) { t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) } - expectedCurationRemark := curationRemark{ - LastCompletionTimestamp: expected.lastCurated, - } - if !actualCurationRemark.Equal(expectedCurationRemark) { - t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark) + if !actualValue.Equal(expected.lastCurated) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue) } } @@ -1374,8 +1370,11 @@ func TestCuratorDeletionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ - Path: curatorDirectory.Path()}) + curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Path: curatorDirectory.Path(), + }, + }) if err != nil { t.Fatal(err) } @@ -1412,7 +1411,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { t.Fatal(err) } - iterator := curatorStates.NewIterator(true) + iterator := curatorStates.p.NewIterator(true) defer iterator.Close() for j, expected := range scenario.out.curationStates { @@ -1427,24 +1426,25 @@ func TestCuratorDeletionProcessor(t *testing.T) { } } - curationKeyDto := &dto.CurationKey{} - curationValueDto := &dto.CurationValue{} + curationKeyDto := new(dto.CurationKey) err = proto.Unmarshal(iterator.Key(), curationKeyDto) if err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } - err = proto.Unmarshal(iterator.Value(), curationValueDto) - if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) - } - actualKey := &curationKey{} + actualKey := new(curationKey) actualKey.load(curationKeyDto) - actualCurationRemark := &curationRemark{} - actualCurationRemark.load(curationValueDto) signature := expected.processor.Signature() + actualValue, present, err := curatorStates.Get(actualKey) + if !present { + t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey) + } + if err != nil { + t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err) + } + expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint.LoadFromString(expected.fingerprint) expectedKey := &curationKey{ @@ -1456,11 +1456,8 @@ func TestCuratorDeletionProcessor(t *testing.T) { if !actualKey.Equal(expectedKey) { t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) } - expectedCurationRemark := curationRemark{ - LastCompletionTimestamp: expected.lastCurated, - } - if !actualCurationRemark.Equal(expectedCurationRemark) { - t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark) + if !actualValue.Equal(expected.lastCurated) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue) } } diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index e6b0ab3e3..b5e7a5748 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -15,7 +15,6 @@ package metric import ( "container/list" - "io" "sync" "time" @@ -171,7 +170,6 @@ func (lru *WatermarkCache) checkCapacity() { type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time type HighWatermarker interface { - io.Closer raw.ForEacher raw.Pruner @@ -181,11 +179,11 @@ type HighWatermarker interface { Size() (uint64, bool, error) } -type LeveldbHighWatermarker struct { +type LevelDBHighWatermarker struct { p *leveldb.LevelDBPersistence } -func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) { +func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) { k := new(dto.Fingerprint) dumpFingerprint(k, f) v := new(dto.MetricHighWatermark) @@ -200,7 +198,7 @@ func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o return t, true, nil } -func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { +func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -229,28 +227,26 @@ func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) return w.p.Commit(batch) } -func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { +func (i *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { return i.p.ForEach(d, f, o) } -func (i *LeveldbHighWatermarker) Prune() (bool, error) { +func (i *LevelDBHighWatermarker) Prune() (bool, error) { i.p.Prune() return false, nil } -func (i *LeveldbHighWatermarker) Close() error { +func (i *LevelDBHighWatermarker) Close() { i.p.Close() - - return nil } -func (i *LeveldbHighWatermarker) State() *raw.DatabaseState { +func (i *LevelDBHighWatermarker) State() *raw.DatabaseState { return i.p.State() } -func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) { - s, err := i.p.ApproximateSize() +func (i *LevelDBHighWatermarker) Size() (uint64, bool, error) { + s, err := i.p.Size() return s, true, err } @@ -258,13 +254,82 @@ type LevelDBHighWatermarkerOptions struct { leveldb.LevelDBOptions } -func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) { +func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { return nil, err } - return &LeveldbHighWatermarker{ + return &LevelDBHighWatermarker{ + p: s, + }, nil +} + +type CurationRemarker interface { + raw.Pruner + + Update(*curationKey, time.Time) error + Get(*curationKey) (t time.Time, ok bool, err error) + State() *raw.DatabaseState + Size() (uint64, bool, error) +} + +type LevelDBCurationRemarker struct { + p *leveldb.LevelDBPersistence +} + +type LevelDBCurationRemarkerOptions struct { + leveldb.LevelDBOptions +} + +func (i *LevelDBCurationRemarker) State() *raw.DatabaseState { + return i.p.State() +} + +func (i *LevelDBCurationRemarker) Size() (uint64, bool, error) { + s, err := i.p.Size() + return s, true, err +} + +func (i *LevelDBCurationRemarker) Close() { + i.p.Close() +} + +func (i *LevelDBCurationRemarker) Prune() (bool, error) { + i.p.Prune() + + return false, nil +} + +func (i *LevelDBCurationRemarker) Get(c *curationKey) (t time.Time, ok bool, err error) { + k := new(dto.CurationKey) + c.dump(k) + v := new(dto.CurationValue) + + ok, err = i.p.Get(k, v) + if err != nil || !ok { + return t, ok, err + } + + return time.Unix(v.GetLastCompletionTimestamp(), 0).UTC(), true, nil +} + +func (i *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error { + k := new(dto.CurationKey) + pair.dump(k) + + return i.p.Put(k, &dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(t.Unix()), + }) +} + +func NewLevelDBCurationRemarker(o *LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LevelDBCurationRemarker{ p: s, }, nil } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 16a8edd22..43f8ce444 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -71,8 +71,8 @@ func (l *LevelDBMembershipIndex) Prune() { l.persistence.Prune() } -func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) { - return l.persistence.ApproximateSize() +func (l *LevelDBMembershipIndex) Size() (uint64, error) { + return l.persistence.Size() } func (l *LevelDBMembershipIndex) State() *raw.DatabaseState { diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 22f23f097..8709c4882 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -332,7 +332,7 @@ func (l *LevelDBPersistence) Prune() { l.storage.CompactRange(keyspace) } -func (l *LevelDBPersistence) ApproximateSize() (uint64, error) { +func (l *LevelDBPersistence) Size() (uint64, error) { iterator := l.NewIterator(false) defer iterator.Close() diff --git a/storage/raw/leveldb/state.go b/storage/raw/leveldb/state.go index 3de6dc457..3e78830ef 100644 --- a/storage/raw/leveldb/state.go +++ b/storage/raw/leveldb/state.go @@ -31,7 +31,7 @@ func (l *LevelDBPersistence) State() *raw.DatabaseState { Supplemental: map[string]string{}, } - if size, err := l.ApproximateSize(); err != nil { + if size, err := l.Size(); err != nil { databaseState.Supplemental["Errors"] = err.Error() } else { databaseState.Size = utility.ByteSize(size) diff --git a/tools/pruner/main.go b/tools/pruner/main.go index adb8924e8..684eec584 100644 --- a/tools/pruner/main.go +++ b/tools/pruner/main.go @@ -42,10 +42,10 @@ func main() { start := time.Now() log.Printf("Starting compaction...") - size, _ := persistences.ApproximateSizes() + size, _ := persistences.Sizes() log.Printf("Original Size: %d", size) persistences.Prune() log.Printf("Finished in %s", time.Since(start)) - size, _ = persistences.ApproximateSizes() + size, _ = persistences.Sizes() log.Printf("New Size: %d", size) }