diff --git a/storage/interface.go b/storage/interface.go index 04e28e6c7..36dca39da 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -27,34 +27,34 @@ type FilterResult int const ( // Stop scanning the database. - STOP FilterResult = iota + Stop FilterResult = iota // Skip this record but continue scanning. - SKIP + Skip // Accept this record for the Operator. - ACCEPT + Accept ) func (f FilterResult) String() string { switch f { - case STOP: + case Stop: return "STOP" - case SKIP: + case Skip: return "SKIP" - case ACCEPT: + case Accept: return "ACCEPT" } panic("unknown") } -type OperatorErrorType int - +// OperatorError is used for storage operations upon errors that may or may not +// be continuable. type OperatorError struct { Error error Continuable bool } -// Filter is responsible for controlling the behavior of the database scan +// RecordFilter is responsible for controlling the behavior of the database scan // process and determines the disposition of various records. // // The protocol around it makes the assumption that the underlying diff --git a/storage/metric/compaction_regression_test.go b/storage/metric/compaction_regression_test.go index 8845a22dd..642d8f614 100644 --- a/storage/metric/compaction_regression_test.go +++ b/storage/metric/compaction_regression_test.go @@ -60,7 +60,7 @@ func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorErr if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) } - fp := new(clientmodel.Fingerprint) + fp := &clientmodel.Fingerprint{} for _, sample := range value.(Values) { if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) { c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 0660867e4..787695655 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -34,7 +34,7 @@ import ( const curationYieldPeriod = 250 * time.Millisecond -var errIllegalIterator = errors.New("Iterator invalid.") +var errIllegalIterator = errors.New("iterator invalid") // CurationStateUpdater receives updates about the curation state. type CurationStateUpdater interface { @@ -50,6 +50,7 @@ type CurationState struct { Fingerprint *clientmodel.Fingerprint } +// CuratorOptions bundles the parameters needed to create a Curator. type CuratorOptions struct { // Stop functions as a channel that when empty allows the curator to operate. // The moment a value is ingested inside of it, the curator goes into drain @@ -59,7 +60,7 @@ type CuratorOptions struct { ViewQueue chan viewJob } -// curator is responsible for effectuating a given curation policy across the +// Curator is responsible for effectuating a given curation policy across the // stored samples on-disk. This is useful to compact sparse sample values into // single sample entities to reduce keyspace load on the datastore. type Curator struct { @@ -71,6 +72,7 @@ type Curator struct { sampleKeys *sampleKeyList } +// NewCurator returns an initialized Curator. func NewCurator(o *CuratorOptions) *Curator { return &Curator{ stop: o.Stop, @@ -122,7 +124,7 @@ type watermarkScanner struct { sampleKeys *sampleKeyList } -// run facilitates the curation lifecycle. +// Run facilitates the curation lifecycle. // // recencyThreshold represents the most recent time up to which values will be // curated. @@ -214,7 +216,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times return } -// drain instructs the curator to stop at the next convenient moment as to not +// Drain instructs the curator to stop at the next convenient moment as to not // introduce data inconsistencies. func (c *Curator) Drain() { if len(c.stop) == 0 { @@ -222,34 +224,35 @@ func (c *Curator) Drain() { } } +// Close needs to be called to cleanly dispose of a curator. func (c *Curator) Close() { c.dtoSampleKeys.Close() c.sampleKeys.Close() } func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) { - key := new(dto.Fingerprint) + key := &dto.Fingerprint{} bytes := in.([]byte) if err := proto.Unmarshal(bytes, key); err != nil { return nil, err } - fingerprint := new(clientmodel.Fingerprint) + fingerprint := &clientmodel.Fingerprint{} loadFingerprint(fingerprint, key) return fingerprint, nil } func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) { - value := new(dto.MetricHighWatermark) + value := &dto.MetricHighWatermark{} bytes := in.([]byte) if err := proto.Unmarshal(bytes, value); err != nil { return nil, err } - watermark := new(watermarks) + watermark := &watermarks{} watermark.load(value) return watermark, nil @@ -280,7 +283,7 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul }() if w.shouldStop() { - return storage.STOP + return storage.Stop } k := &curationKey{ @@ -295,24 +298,24 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul return } if !present { - return storage.ACCEPT + return storage.Accept } if !curationRemark.Before(w.stopAt) { - return storage.SKIP + return storage.Skip } watermark := value.(*watermarks) if !curationRemark.Before(watermark.High) { - return storage.SKIP + return storage.Skip } curationConsistent, err := w.curationConsistent(fingerprint, watermark) if err != nil { return } if curationConsistent { - return storage.SKIP + return storage.Skip } - return storage.ACCEPT + return storage.Accept } // curationConsistent determines whether the given metric is in a dirty state diff --git a/storage/metric/end_to_end_test.go b/storage/metric/end_to_end_test.go index 7349fabc7..f8f34bdff 100644 --- a/storage/metric/end_to_end_test.go +++ b/storage/metric/end_to_end_test.go @@ -272,8 +272,9 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) { } } - if true { - // XXX: Purely a benchmark. + v, ok := p.(View) + if !ok { + // It's purely a benchmark for a MetricPersistence that is not viewable. return } @@ -294,7 +295,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - samples := p.GetValueAtTime(fingerprints[0], time) + samples := v.GetValueAtTime(fingerprints[0], time) if len(samples) == 0 { t.Fatal("expected at least one sample.") } @@ -303,7 +304,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) { for _, sample := range samples { if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + t.Fatalf("expected %v value, got %v", expected, sample.Value) } } } @@ -334,8 +335,9 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { p.AppendSamples(s) - if true { - // XXX: Purely a benchmark. + v, ok := p.(View) + if !ok { + // It's purely a benchmark for a MetricPersistance that is not viewable. return } @@ -356,7 +358,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - samples := p.GetValueAtTime(fingerprints[0], time) + samples := v.GetValueAtTime(fingerprints[0], time) if len(samples) == 0 { t.Fatal("expected at least one sample.") } @@ -365,7 +367,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { for _, sample := range samples { if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + t.Fatalf("expected %v value, got %v", expected, sample.Value) } } } diff --git a/storage/metric/freelist.go b/storage/metric/freelist.go index 0ad318364..06bc9c2c6 100644 --- a/storage/metric/freelist.go +++ b/storage/metric/freelist.go @@ -34,7 +34,7 @@ func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) { return v.(*dto.SampleKey), ok } - return new(dto.SampleKey), false + return &dto.SampleKey{}, false } func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool { @@ -51,7 +51,7 @@ type sampleKeyList struct { l utility.FreeList } -var defaultSampleKey = new(SampleKey) +var defaultSampleKey = &SampleKey{} func newSampleKeyList(cap int) *sampleKeyList { return &sampleKeyList{ @@ -64,7 +64,7 @@ func (l *sampleKeyList) Get() (*SampleKey, bool) { return v.(*SampleKey), ok } - return new(SampleKey), false + return &SampleKey{}, false } func (l *sampleKeyList) Give(v *SampleKey) bool { @@ -86,10 +86,10 @@ func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) { return v.(*getValuesAtTimeOp), ok } - return new(getValuesAtTimeOp), false + return &getValuesAtTimeOp{}, false } -var pGetValuesAtTimeOp = new(getValuesAtTimeOp) +var pGetValuesAtTimeOp = &getValuesAtTimeOp{} func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool { *v = *pGetValuesAtTimeOp @@ -112,10 +112,10 @@ func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) { return v.(*getValuesAtIntervalOp), ok } - return new(getValuesAtIntervalOp), false + return &getValuesAtIntervalOp{}, false } -var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp) +var pGetValuesAtIntervalOp = &getValuesAtIntervalOp{} func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool { *v = *pGetValuesAtIntervalOp @@ -138,10 +138,10 @@ func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) { return v.(*getValuesAlongRangeOp), ok } - return new(getValuesAlongRangeOp), false + return &getValuesAlongRangeOp{}, false } -var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp) +var pGetValuesAlongRangeOp = &getValuesAlongRangeOp{} func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool { *v = *pGetValuesAlongRangeOp @@ -164,10 +164,10 @@ func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) return v.(*getValueRangeAtIntervalOp), ok } - return new(getValueRangeAtIntervalOp), false + return &getValueRangeAtIntervalOp{}, false } -var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp) +var pGetValueRangeAtIntervalOp = &getValueRangeAtIntervalOp{} func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool { *v = *pGetValueRangeAtIntervalOp diff --git a/storage/metric/index.go b/storage/metric/index.go index 986dbde36..4defae1e1 100644 --- a/storage/metric/index.go +++ b/storage/metric/index.go @@ -21,8 +21,6 @@ import ( "code.google.com/p/goprotobuf/proto" clientmodel "github.com/prometheus/client_golang/model" - - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" @@ -30,59 +28,52 @@ import ( dto "github.com/prometheus/prometheus/model/generated" ) +// FingerprintMetricMapping is an in-memory map of Fingerprints to Metrics. type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric +// FingerprintMetricIndex models a database mapping Fingerprints to Metrics. type FingerprintMetricIndex interface { + raw.Database raw.Pruner IndexBatch(FingerprintMetricMapping) error Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) - State() *raw.DatabaseState - Size() (s uint64, present bool, err error) } +// LevelDBFingerprintMetricIndex implements FingerprintMetricIndex using +// leveldb. type LevelDBFingerprintMetricIndex struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } +// LevelDBFingerprintMetricIndexOptions just wraps leveldb.LevelDBOptions. type LevelDBFingerprintMetricIndexOptions struct { leveldb.LevelDBOptions } -func (i *LevelDBFingerprintMetricIndex) Close() { - i.p.Close() -} - -func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState { - return i.p.State() -} - -func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) { - s, err := i.p.Size() - return s, true, err -} - +// IndexBatch implements FingerprintMetricIndex. func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { b := leveldb.NewBatch() defer b.Close() for f, m := range mapping { - k := new(dto.Fingerprint) + k := &dto.Fingerprint{} dumpFingerprint(k, &f) - v := new(dto.Metric) + v := &dto.Metric{} dumpMetric(v, m) b.Put(k, v) } - return i.p.Commit(b) + return i.LevelDBPersistence.Commit(b) } +// Lookup implements FingerprintMetricIndex. func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { - k := new(dto.Fingerprint) + k := &dto.Fingerprint{} dumpFingerprint(k, f) - v := new(dto.Metric) - if ok, err := i.p.Get(k, v); !ok { + v := &dto.Metric{} + if ok, err := i.LevelDBPersistence.Get(k, v); !ok { return nil, false, nil } else if err != nil { return nil, false, err @@ -97,12 +88,8 @@ func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl return m, true, nil } -func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) { - i.p.Prune() - - return false, nil -} - +// NewLevelDBFingerprintMetricIndex returns a LevelDBFingerprintMetricIndex +// object ready to use. func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -110,26 +97,32 @@ func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (* } return &LevelDBFingerprintMetricIndex{ - p: s, + LevelDBPersistence: s, }, nil } +// LabelNameFingerprintMapping is an in-memory map of LabelNames to +// Fingerprints. type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints +// LabelNameFingerprintIndex models a database mapping LabelNames to +// Fingerprints. type LabelNameFingerprintIndex interface { + raw.Database raw.Pruner IndexBatch(LabelNameFingerprintMapping) error Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) Has(clientmodel.LabelName) (ok bool, err error) - State() *raw.DatabaseState - Size() (s uint64, present bool, err error) } +// LevelDBLabelNameFingerprintIndex implements LabelNameFingerprintIndex using +// leveldb. type LevelDBLabelNameFingerprintIndex struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } +// IndexBatch implements LabelNameFingerprintIndex. func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -140,9 +133,9 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp key := &dto.LabelName{ Name: proto.String(string(labelName)), } - value := new(dto.FingerprintCollection) + value := &dto.FingerprintCollection{} for _, fingerprint := range fingerprints { - f := new(dto.Fingerprint) + f := &dto.Fingerprint{} dumpFingerprint(f, fingerprint) value.Member = append(value.Member, f) } @@ -150,14 +143,15 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp batch.Put(key, value) } - return i.p.Commit(batch) + return i.LevelDBPersistence.Commit(batch) } +// Lookup implements LabelNameFingerprintIndex. func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { - k := new(dto.LabelName) + k := &dto.LabelName{} dumpLabelName(k, l) - v := new(dto.FingerprintCollection) - ok, err = i.p.Get(k, v) + v := &dto.FingerprintCollection{} + ok, err = i.LevelDBPersistence.Get(k, v) if err != nil { return nil, false, err } @@ -166,7 +160,7 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps } for _, m := range v.Member { - fp := new(clientmodel.Fingerprint) + fp := &clientmodel.Fingerprint{} loadFingerprint(fp, m) fps = append(fps, fp) } @@ -174,35 +168,20 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps return fps, true, nil } +// Has implements LabelNameFingerprintIndex. func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { - return i.p.Has(&dto.LabelName{ + return i.LevelDBPersistence.Has(&dto.LabelName{ Name: proto.String(string(l)), }) } -func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) { - i.p.Prune() - - return false, nil -} - -func (i *LevelDBLabelNameFingerprintIndex) Close() { - i.p.Close() -} - -func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) { - s, err := i.p.Size() - return s, true, err -} - -func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState { - return i.p.State() -} - +// LevelDBLabelNameFingerprintIndexOptions just wraps leveldb.LevelDBOptions. type LevelDBLabelNameFingerprintIndexOptions struct { leveldb.LevelDBOptions } +// NewLevelLabelNameFingerprintIndex returns a LevelDBLabelNameFingerprintIndex +// ready to use. func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -210,31 +189,38 @@ func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions } return &LevelDBLabelNameFingerprintIndex{ - p: s, + LevelDBPersistence: s, }, nil } +// LabelPairFingerprintMapping is an in-memory map of LabelPairs to +// Fingerprints. type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints +// LabelPairFingerprintIndex models a database mapping LabelPairs to +// Fingerprints. type LabelPairFingerprintIndex interface { + raw.Database raw.ForEacher raw.Pruner IndexBatch(LabelPairFingerprintMapping) error Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error) Has(*LabelPair) (ok bool, err error) - State() *raw.DatabaseState - Size() (s uint64, present bool, err error) } +// LevelDBLabelPairFingerprintIndex implements LabelPairFingerprintIndex using +// leveldb. type LevelDBLabelPairFingerprintIndex struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } +// LevelDBLabelSetFingerprintIndexOptions just wraps leveldb.LevelDBOptions. type LevelDBLabelSetFingerprintIndexOptions struct { leveldb.LevelDBOptions } +// IndexBatch implements LabelPairFingerprintMapping. func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -246,9 +232,9 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp Name: proto.String(string(pair.Name)), Value: proto.String(string(pair.Value)), } - value := new(dto.FingerprintCollection) + value := &dto.FingerprintCollection{} for _, fp := range fps { - f := new(dto.Fingerprint) + f := &dto.Fingerprint{} dumpFingerprint(f, fp) value.Member = append(value.Member, f) } @@ -256,17 +242,18 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp batch.Put(key, value) } - return i.p.Commit(batch) + return i.LevelDBPersistence.Commit(batch) } +// Lookup implements LabelPairFingerprintMapping. func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { k := &dto.LabelPair{ Name: proto.String(string(p.Name)), Value: proto.String(string(p.Value)), } - v := new(dto.FingerprintCollection) + v := &dto.FingerprintCollection{} - ok, err = i.p.Get(k, v) + ok, err = i.LevelDBPersistence.Get(k, v) if !ok { return nil, false, nil @@ -276,7 +263,7 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F } for _, pair := range v.Member { - fp := new(clientmodel.Fingerprint) + fp := &clientmodel.Fingerprint{} loadFingerprint(fp, pair) m = append(m, fp) } @@ -284,37 +271,18 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F return m, true, nil } +// Has implements LabelPairFingerprintMapping. func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { k := &dto.LabelPair{ Name: proto.String(string(p.Name)), Value: proto.String(string(p.Value)), } - return i.p.Has(k) -} - -func (i *LevelDBLabelPairFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { - return i.p.ForEach(d, f, o) -} - -func (i *LevelDBLabelPairFingerprintIndex) Prune() (bool, error) { - i.p.Prune() - return false, nil -} - -func (i *LevelDBLabelPairFingerprintIndex) Close() { - i.p.Close() -} - -func (i *LevelDBLabelPairFingerprintIndex) Size() (uint64, bool, error) { - s, err := i.p.Size() - return s, true, err -} - -func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState { - return i.p.State() + return i.LevelDBPersistence.Has(k) } +// NewLevelDBLabelSetFingerprintIndex returns a LevelDBLabelPairFingerprintIndex +// object ready to use. func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -322,68 +290,55 @@ func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions } return &LevelDBLabelPairFingerprintIndex{ - p: s, + LevelDBPersistence: s, }, nil } +// MetricMembershipIndex models a database tracking the existence of Metrics. type MetricMembershipIndex interface { + raw.Database raw.Pruner IndexBatch(FingerprintMetricMapping) error Has(clientmodel.Metric) (ok bool, err error) - State() *raw.DatabaseState - Size() (s uint64, present bool, err error) } +// LevelDBMetricMembershipIndex implements MetricMembershipIndex using leveldb. type LevelDBMetricMembershipIndex struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } -var existenceIdentity = new(dto.MembershipIndexValue) +var existenceIdentity = &dto.MembershipIndexValue{} +// IndexBatch implements MetricMembershipIndex. func (i *LevelDBMetricMembershipIndex) IndexBatch(b FingerprintMetricMapping) error { batch := leveldb.NewBatch() defer batch.Close() for _, m := range b { - k := new(dto.Metric) + k := &dto.Metric{} dumpMetric(k, m) batch.Put(k, existenceIdentity) } - return i.p.Commit(batch) + return i.LevelDBPersistence.Commit(batch) } +// Has implements MetricMembershipIndex. func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { - k := new(dto.Metric) + k := &dto.Metric{} dumpMetric(k, m) - return i.p.Has(k) -} - -func (i *LevelDBMetricMembershipIndex) Close() { - i.p.Close() -} - -func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) { - s, err := i.p.Size() - return s, true, err -} - -func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState { - return i.p.State() -} - -func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) { - i.p.Prune() - - return false, nil + return i.LevelDBPersistence.Has(k) } +// LevelDBMetricMembershipIndexOptions just wraps leveldb.LevelDBOptions type LevelDBMetricMembershipIndexOptions struct { leveldb.LevelDBOptions } +// NewLevelDBMetricMembershipIndex returns a LevelDBMetricMembershipIndex object +// ready to use. func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -391,7 +346,7 @@ func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*Le } return &LevelDBMetricMembershipIndex{ - p: s, + LevelDBPersistence: s, }, nil } @@ -402,7 +357,7 @@ type MetricIndexer interface { IndexMetrics(FingerprintMetricMapping) error } -// IndexObserver listens and receives changes to a given +// IndexerObserver listens and receives changes to a given // FingerprintMetricMapping. type IndexerObserver interface { Observe(FingerprintMetricMapping) error @@ -422,6 +377,8 @@ type IndexerProxy struct { observers []IndexerObserver } +// IndexMetrics proxies the given FingerprintMetricMapping to the underlying +// MetricIndexer and calls all registered observers with it. func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error { if p.err != nil { return p.err @@ -451,7 +408,7 @@ func (p *IndexerProxy) Close() error { return nil } -// Close flushes the underlying index requests before closing. +// Flush flushes the underlying index requests before closing. func (p *IndexerProxy) Flush() error { if p.err != nil { return p.err @@ -477,6 +434,8 @@ type SynchronizedIndexer struct { i MetricIndexer } +// IndexMetrics calls IndexMetrics of the wrapped MetricIndexer after acquiring +// a lock. func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error { i.mu.Lock() defer i.mu.Unlock() @@ -488,6 +447,8 @@ type flusher interface { Flush() error } +// Flush calls Flush of the wrapped MetricIndexer after acquiring a lock. If the +// wrapped MetricIndexer has no Flush method, this is a no-op. func (i *SynchronizedIndexer) Flush() error { if flusher, ok := i.i.(flusher); ok { i.mu.Lock() @@ -499,6 +460,8 @@ func (i *SynchronizedIndexer) Flush() error { return nil } +// Close calls Close of the wrapped MetricIndexer after acquiring a lock. If the +// wrapped MetricIndexer has no Close method, this is a no-op. func (i *SynchronizedIndexer) Close() error { if closer, ok := i.i.(io.Closer); ok { i.mu.Lock() @@ -510,7 +473,8 @@ func (i *SynchronizedIndexer) Close() error { return nil } -// NewSynchronizedIndexer builds a new MetricIndexer. +// NewSynchronizedIndexer returns a SynchronizedIndexer wrapping the given +// MetricIndexer. func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer { return &SynchronizedIndexer{ i: i, @@ -531,6 +495,8 @@ type BufferedIndexer struct { err error } +// IndexMetrics writes the entries in the given FingerprintMetricMapping to the +// index. func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error { if i.err != nil { return i.err @@ -542,8 +508,6 @@ func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error { return nil } - i.buf = append(i.buf) - i.err = i.Flush() return i.err @@ -590,8 +554,8 @@ func (i *BufferedIndexer) Close() error { return nil } +// NewBufferedIndexer returns a BufferedIndexer ready to use. func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer { - return &BufferedIndexer{ i: i, limit: limit, @@ -715,6 +679,8 @@ func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMappin return batch, nil } +// IndexMetrics adds the facets of all unindexed metrics found in the given +// FingerprintMetricMapping to the corresponding indices. func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error { unindexed, err := findUnindexed(i.MetricMembership, b) if err != nil { diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 9adb07dcd..d28a5e954 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -13,11 +13,7 @@ package metric -import ( - clientmodel "github.com/prometheus/client_golang/model" - - "github.com/prometheus/prometheus/storage" -) +import clientmodel "github.com/prometheus/client_golang/model" // AppendBatch models a batch of samples to be stored. type AppendBatch map[clientmodel.Fingerprint]SampleSet @@ -32,24 +28,17 @@ type MetricPersistence interface { // Record a group of new samples in the storage layer. AppendSamples(clientmodel.Samples) error - // Get all of the metric fingerprints that are associated with the provided - // label set. + // Get all of the metric fingerprints that are associated with the + // provided label set. GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error) - // Get all of the metric fingerprints that are associated for a given label - // name. + // Get all of the metric fingerprints that are associated for a given + // label name. GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, error) // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error) - // Get the two metric values that are immediately adjacent to a given time. - GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values - // Get the boundary values of an interval: the first value older than the - // interval start, and the first value younger than the interval end. - GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values - // Get all values contained within a provided interval. - GetRangeValues(*clientmodel.Fingerprint, Interval) Values // Get all label values that are associated with a given label name. GetAllValuesForLabel(clientmodel.LabelName) (clientmodel.LabelValues, error) } @@ -57,19 +46,19 @@ type MetricPersistence interface { // View provides a view of the values in the datastore subject to the request // of a preloading operation. type View interface { + // Get the two values that are immediately adjacent to a given time. GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values + // Get the boundary values of an interval: the first value older than + // the interval start, and the first value younger than the interval + // end. GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values + // Get all values contained within a provided interval. GetRangeValues(*clientmodel.Fingerprint, Interval) Values - - // Destroy this view. - Close() } -type Series interface { - Fingerprint() *clientmodel.Fingerprint - Metric() clientmodel.Metric -} - -type IteratorsForFingerprintBuilder interface { - ForStream(stream *stream) (storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) +// ViewableMetricPersistence is a MetricPersistence that is able to present the +// samples it has stored as a View. +type ViewableMetricPersistence interface { + MetricPersistence + View } diff --git a/storage/metric/labelpair.go b/storage/metric/labelpair.go index 02bc928ee..afecb42a4 100644 --- a/storage/metric/labelpair.go +++ b/storage/metric/labelpair.go @@ -17,11 +17,14 @@ import ( clientmodel "github.com/prometheus/client_golang/model" ) +// LabelPair pairs a name with a value. type LabelPair struct { Name clientmodel.LabelName Value clientmodel.LabelValue } +// Equal returns true iff both the Name and the Value of this LabelPair and o +// are equal. func (l *LabelPair) Equal(o *LabelPair) bool { switch { case l.Name != o.Name: @@ -33,6 +36,8 @@ func (l *LabelPair) Equal(o *LabelPair) bool { } } +// LabelPairs is a sortable slice of LabelPair pointers. It implements +// sort.Interface. type LabelPairs []*LabelPair func (l LabelPairs) Len() int { diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 24490079c..0c09f27ac 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -35,11 +35,12 @@ import ( const sortConcurrency = 2 +// LevelDBMetricPersistence is a leveldb-backed persistence layer for metrics. type LevelDBMetricPersistence struct { CurationRemarks CurationRemarker FingerprintToMetrics FingerprintMetricIndex LabelNameToFingerprints LabelNameFingerprintIndex - LabelSetToFingerprints LabelPairFingerprintIndex + LabelPairToFingerprints LabelPairFingerprintIndex MetricHighWatermarks HighWatermarker MetricMembershipIndex MetricMembershipIndex @@ -63,8 +64,8 @@ type LevelDBMetricPersistence struct { var ( leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.") - // These flag values are back of the envelope, though they seem sensible. - // Please re-evaluate based on your own needs. + // These flag values are back of the envelope, though they seem + // sensible. Please re-evaluate based on your own needs. curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).") fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).") highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).") @@ -75,19 +76,15 @@ var ( ) type leveldbOpener func() -type errorCloser interface { - Close() error -} -type closer interface { - Close() -} +// Close closes all the underlying persistence layers. It implements the +// MetricPersistence interface. func (l *LevelDBMetricPersistence) Close() { - var persistences = []interface{}{ + var persistences = []raw.Database{ l.CurationRemarks, l.FingerprintToMetrics, l.LabelNameToFingerprints, - l.LabelSetToFingerprints, + l.LabelPairToFingerprints, l.MetricHighWatermarks, l.MetricMembershipIndex, l.MetricSamples, @@ -97,15 +94,10 @@ func (l *LevelDBMetricPersistence) Close() { for _, c := range persistences { closerGroup.Add(1) - go func(c interface{}) { + go func(c raw.Database) { if c != nil { - switch closer := c.(type) { - case closer: - closer.Close() - case errorCloser: - if err := closer.Close(); err != nil { - glog.Error("Error closing persistence: ", err) - } + if err := c.Close(); err != nil { + glog.Error("Error closing persistence: ", err) } } closerGroup.Done() @@ -115,10 +107,12 @@ func (l *LevelDBMetricPersistence) Close() { closerGroup.Wait() } +// NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready +// to use. func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { workers := utility.NewUncertaintyGroup(7) - emission := new(LevelDBMetricPersistence) + emission := &LevelDBMetricPersistence{} var subsystemOpeners = []struct { name string @@ -185,7 +179,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name and Value Pair", func() { var err error - emission.LabelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{ + emission.LabelPairToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{ LevelDBOptions: leveldb.LevelDBOptions{ Name: "Fingerprints by Label Pair", Purpose: "Index", @@ -239,19 +233,20 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc glog.Error("Could not open storage: ", err) } - return nil, fmt.Errorf("Unable to open metric persistence.") + return nil, fmt.Errorf("unable to open metric persistence") } emission.Indexer = &TotalIndexer{ FingerprintToMetric: emission.FingerprintToMetrics, LabelNameToFingerprint: emission.LabelNameToFingerprints, - LabelPairToFingerprint: emission.LabelSetToFingerprints, + LabelPairToFingerprint: emission.LabelPairToFingerprints, MetricMembership: emission.MetricMembershipIndex, } return emission, nil } +// AppendSample implements the MetricPersistence interface. func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -317,6 +312,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel. return l.MetricHighWatermarks.UpdateBatch(b) } +// AppendSamples appends the given Samples to the database and indexes them. func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -345,9 +341,9 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e samplesBatch := leveldb.NewBatch() defer samplesBatch.Close() - key := new(SampleKey) - keyDto := new(dto.SampleKey) - value := new(dto.SampleValueSeries) + key := &SampleKey{} + keyDto := &dto.SampleKey{} + value := &dto.SampleValueSeries{} for fingerprint, group := range fingerprintToSamples { for { @@ -434,6 +430,8 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b return l.MetricMembershipIndex.Has(m) } +// HasLabelPair returns true if the given LabelPair is present in the underlying +// LabelPair index. func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -441,9 +439,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err e recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }(time.Now()) - return l.LabelSetToFingerprints.Has(p) + return l.LabelPairToFingerprints.Has(p) } +// HasLabelName returns true if the given LabelName is present in the underlying +// LabelName index. func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -456,6 +456,9 @@ func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value return } +// GetFingerprintsForLabelSet returns the Fingerprints for the given LabelSet by +// querying the underlying LabelPairFingerprintIndex for each LabelPair +// contained in LabelSet. It implements the MetricPersistence interface. func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (fps clientmodel.Fingerprints, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -466,7 +469,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod sets := []utility.Set{} for name, value := range labelSet { - fps, _, err := l.LabelSetToFingerprints.Lookup(&LabelPair{ + fps, _, err := l.LabelPairToFingerprints.Lookup(&LabelPair{ Name: name, Value: value, }) @@ -500,6 +503,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod return fps, nil } +// GetFingerprintsForLabelName returns the Fingerprints for the given LabelName +// from the underlying LabelNameFingerprintIndex. It implements the +// MetricPersistence interface. func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientmodel.LabelName) (fps clientmodel.Fingerprints, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -513,6 +519,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm return fps, err } +// GetMetricForFingerprint returns the Metric for the given Fingerprint from the +// underlying FingerprintMetricIndex. It implements the MetricPersistence +// interface. func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -526,68 +535,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger return m, nil } -func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values { - panic("Not implemented") -} - -func (l *LevelDBMetricPersistence) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values { - panic("Not implemented") -} - -func (l *LevelDBMetricPersistence) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values { - panic("Not implemented") -} - -type MetricKeyDecoder struct{} - -func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) { - unmarshaled := dto.LabelPair{} - err = proto.Unmarshal(in.([]byte), &unmarshaled) - if err != nil { - return - } - - out = LabelPair{ - Name: clientmodel.LabelName(*unmarshaled.Name), - Value: clientmodel.LabelValue(*unmarshaled.Value), - } - - return -} - -func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) { - return -} - -type LabelNameFilter struct { - labelName clientmodel.LabelName -} - -func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { - labelPair, ok := key.(LabelPair) - if ok && labelPair.Name == f.labelName { - return storage.ACCEPT - } - return storage.SKIP -} - -type CollectLabelValuesOp struct { - labelValues []clientmodel.LabelValue -} - -func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) { - labelPair := key.(LabelPair) - op.labelValues = append(op.labelValues, clientmodel.LabelValue(labelPair.Value)) - return -} - +// GetAllValuesForLabel gets all label values that are associated with the +// provided label name. func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) { filter := &LabelNameFilter{ labelName: labelName, } labelValuesOp := &CollectLabelValuesOp{} - _, err = l.LabelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp) + _, err = l.LabelPairToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp) if err != nil { return } @@ -604,41 +560,42 @@ func (l *LevelDBMetricPersistence) Prune() { l.CurationRemarks.Prune() l.FingerprintToMetrics.Prune() l.LabelNameToFingerprints.Prune() - l.LabelSetToFingerprints.Prune() + l.LabelPairToFingerprints.Prune() l.MetricHighWatermarks.Prune() l.MetricMembershipIndex.Prune() l.MetricSamples.Prune() } +// Sizes returns the sum of all sizes of the underlying databases. func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) { size := uint64(0) - if size, _, err = l.CurationRemarks.Size(); err != nil { + if size, err = l.CurationRemarks.Size(); err != nil { return 0, err } total += size - if size, _, err = l.FingerprintToMetrics.Size(); err != nil { + if size, err = l.FingerprintToMetrics.Size(); err != nil { return 0, err } total += size - if size, _, err = l.LabelNameToFingerprints.Size(); err != nil { + if size, err = l.LabelNameToFingerprints.Size(); err != nil { return 0, err } total += size - if size, _, err = l.LabelSetToFingerprints.Size(); err != nil { + if size, err = l.LabelPairToFingerprints.Size(); err != nil { return 0, err } total += size - if size, _, err = l.MetricHighWatermarks.Size(); err != nil { + if size, err = l.MetricHighWatermarks.Size(); err != nil { return 0, err } total += size - if size, _, err = l.MetricMembershipIndex.Size(); err != nil { + if size, err = l.MetricMembershipIndex.Size(); err != nil { return 0, err } total += size @@ -651,20 +608,64 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) { return total, nil } +// States returns the DatabaseStates of all underlying databases. func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { return raw.DatabaseStates{ l.CurationRemarks.State(), l.FingerprintToMetrics.State(), l.LabelNameToFingerprints.State(), - l.LabelSetToFingerprints.State(), + l.LabelPairToFingerprints.State(), l.MetricHighWatermarks.State(), l.MetricMembershipIndex.State(), l.MetricSamples.State(), } } +// CollectLabelValuesOp implements storage.RecordOperator. It collects the +// encountered LabelValues in a slice. +type CollectLabelValuesOp struct { + labelValues []clientmodel.LabelValue +} + +// Operate implements storage.RecordOperator. 'key' is required to be a +// LabelPair. Its Value is appended to a slice of collected LabelValues. +func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) { + labelPair := key.(LabelPair) + op.labelValues = append(op.labelValues, labelPair.Value) + return +} + +// MetricKeyDecoder implements storage.RecordDecoder for LabelPairs. +type MetricKeyDecoder struct{} + +// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a +// LabelPair protobuf. 'out' is a metric.LabelPair. +func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) { + unmarshaled := dto.LabelPair{} + err = proto.Unmarshal(in.([]byte), &unmarshaled) + if err != nil { + return + } + + out = LabelPair{ + Name: clientmodel.LabelName(*unmarshaled.Name), + Value: clientmodel.LabelValue(*unmarshaled.Value), + } + + return +} + +// DecodeValue implements storage.RecordDecoder. It is a no-op and always +// returns (nil, nil). +func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) { + return +} + +// MetricSamplesDecoder implements storage.RecordDecoder for SampleKeys. type MetricSamplesDecoder struct{} +// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a +// SampleKey protobuf. 'out' is a metric.SampleKey. func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { key := &dto.SampleKey{} err := proto.Unmarshal(in.([]byte), key) @@ -678,6 +679,8 @@ func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { return sampleKey, nil } +// DecodeValue implements storage.RecordDecoder. It requires 'in' to be a +// SampleValueSeries protobuf. 'out' is of type metric.Values. func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { values := &dto.SampleValueSeries{} err := proto.Unmarshal(in.([]byte), values) @@ -688,8 +691,27 @@ func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) return NewValuesFromDTO(values), nil } +// AcceptAllFilter implements storage.RecordFilter and accepts all records. type AcceptAllFilter struct{} +// Filter implements storage.RecordFilter. It always returns ACCEPT. func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult { - return storage.ACCEPT + return storage.Accept +} + +// LabelNameFilter implements storage.RecordFilter and filters records matching +// a LabelName. +type LabelNameFilter struct { + labelName clientmodel.LabelName +} + +// Filter implements storage.RecordFilter. 'key' is expected to be a +// LabelPair. The result is ACCEPT if the Name of the LabelPair matches the +// LabelName of this LabelNameFilter. +func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) { + labelPair, ok := key.(LabelPair) + if ok && labelPair.Name == f.labelName { + return storage.Accept + } + return storage.Skip } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 1ea929dcf..fee10078d 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -22,24 +22,10 @@ import ( "github.com/prometheus/prometheus/utility" ) -// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of -// storage per metric without any major reallocations. +// An initialSeriesArenaSize of 4*60 allows for one hour's worth of storage per +// metric without any major reallocations - assuming a sample rate of 1 / 15Hz. const initialSeriesArenaSize = 4 * 60 -// Models a given sample entry stored in the in-memory arena. -type value interface { - // Gets the given value. - get() clientmodel.SampleValue -} - -// Models a single sample value. It presumes that there is either no subsequent -// value seen or that any subsequent values are of a different value. -type singletonValue clientmodel.SampleValue - -func (v singletonValue) get() clientmodel.SampleValue { - return clientmodel.SampleValue(v) -} - type stream interface { add(...*SamplePair) @@ -194,9 +180,11 @@ type memorySeriesStorage struct { labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints } +// MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create +// a memory series storage. type MemorySeriesOptions struct { - // If provided, this WatermarkCache will be updated for any samples that are - // appended to the memorySeriesStorage. + // If provided, this WatermarkCache will be updated for any samples that + // are appended to the memorySeriesStorage. WatermarkCache *watermarkCache } @@ -485,6 +473,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa return } +// NewMemorySeriesStorage returns a memory series storage ready to use. func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { return &memorySeriesStorage{ fingerprintToSeries: make(map[clientmodel.Fingerprint]stream), diff --git a/storage/metric/operation.go b/storage/metric/operation.go index 3e09336ce..ef341d8f9 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -22,7 +22,7 @@ import ( clientmodel "github.com/prometheus/client_golang/model" ) -// Encapsulates a primitive query operation. +// op encapsulates a primitive query operation. type op interface { // The time at which this operation starts. StartsAt() clientmodel.Timestamp @@ -30,14 +30,14 @@ type op interface { ExtractSamples(Values) Values // Return whether the operator has consumed all data it needs. Consumed() bool - // Get current operation time or nil if no subsequent work associated with - // this operator remains. + // Get current operation time or nil if no subsequent work associated + // with this operator remains. CurrentTime() clientmodel.Timestamp // GreedierThan indicates whether this present operation should take // precedence over the other operation due to greediness. // - // A critical assumption is that this operator and the other occur at the - // same time: this.StartsAt().Equal(op.StartsAt()). + // A critical assumption is that this operator and the other occur at + // the same time: this.StartsAt().Equal(op.StartsAt()). GreedierThan(op) bool } @@ -48,8 +48,8 @@ func (o ops) Len() int { return len(o) } -// startsAtSort implements the sorting protocol and allows operator to be sorted -// in chronological order by when they start. +// startsAtSort implements sort.Interface and allows operator to be sorted in +// chronological order by when they start. type startsAtSort struct { ops } @@ -62,7 +62,8 @@ func (o ops) Swap(i, j int) { o[i], o[j] = o[j], o[i] } -// Encapsulates getting values at or adjacent to a specific time. +// getValuesAtTimeOp encapsulates getting values at or adjacent to a specific +// time. type getValuesAtTimeOp struct { time clientmodel.Timestamp consumed bool @@ -112,15 +113,17 @@ func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) { out = in[len(in)-1:] } else { if in[i].Timestamp.Equal(t) && len(in) > i+1 { - // We hit exactly the current sample time. Very unlikely in practice. - // Return only the current sample. + // We hit exactly the current sample time. Very unlikely + // in practice. Return only the current sample. out = in[i : i+1] } else { if i == 0 { - // We hit before the first sample time. Return only the first sample. + // We hit before the first sample time. Return + // only the first sample. out = in[0:1] } else { - // We hit between two samples. Return both surrounding samples. + // We hit between two samples. Return both + // surrounding samples. out = in[i-1 : i+1] } } @@ -136,15 +139,16 @@ func (g getValuesAtTimeOp) Consumed() bool { return g.consumed } -// Encapsulates getting values at a given interval over a duration. +// getValuesAtIntervalOp encapsulates getting values at a given interval over a +// duration. type getValuesAtIntervalOp struct { from clientmodel.Timestamp through clientmodel.Timestamp interval time.Duration } -func (o *getValuesAtIntervalOp) String() string { - return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", o.from, o.interval, o.through) +func (g *getValuesAtIntervalOp) String() string { + return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.from, g.interval, g.through) } func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp { @@ -199,14 +203,14 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { return } -// Encapsulates getting all values in a given range. +// getValuesAlongRangeOp encapsulates getting all values in a given range. type getValuesAlongRangeOp struct { from clientmodel.Timestamp through clientmodel.Timestamp } -func (o *getValuesAlongRangeOp) String() string { - return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", o.from, o.through) +func (g *getValuesAlongRangeOp) String() string { + return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.from, g.through) } func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp { @@ -271,7 +275,8 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { return } -// Encapsulates getting all values from ranges along intervals. +// getValueRangeAtIntervalOp encapsulates getting all values from ranges along +// intervals. // // Works just like getValuesAlongRangeOp, but when from > through, through is // incremented by interval and from is reset to through-rangeDuration. Returns @@ -284,8 +289,8 @@ type getValueRangeAtIntervalOp struct { through clientmodel.Timestamp } -func (o *getValueRangeAtIntervalOp) String() string { - return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through) +func (g *getValueRangeAtIntervalOp) String() string { + return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.rangeFrom, g.interval, g.through) } func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp { diff --git a/storage/metric/processor.go b/storage/metric/processor.go index fd022d7ce..c18f7e30a 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -26,43 +26,51 @@ import ( dto "github.com/prometheus/prometheus/model/generated" ) -// processor models a post-processing agent that performs work given a sample +// Processor models a post-processing agent that performs work given a sample // corpus. type Processor interface { - // Name emits the name of this processor's signature encoder. It must be - // fully-qualified in the sense that it could be used via a Protocol Buffer - // registry to extract the descriptor to reassemble this message. + // Name emits the name of this processor's signature encoder. It must + // be fully-qualified in the sense that it could be used via a Protocol + // Buffer registry to extract the descriptor to reassemble this message. Name() string // Signature emits a byte signature for this process for the purpose of // remarking how far along it has been applied to the database. Signature() []byte - // Apply runs this processor against the sample set. sampleIterator expects - // to be pre-seeked to the initial starting position. The processor will - // run until up until stopAt has been reached. It is imperative that the - // provided stopAt is within the interval of the series frontier. + // Apply runs this processor against the sample set. sampleIterator + // expects to be pre-seeked to the initial starting position. The + // processor will run until up until stopAt has been reached. It is + // imperative that the provided stopAt is within the interval of the + // series frontier. // - // Upon completion or error, the last time at which the processor finished - // shall be emitted in addition to any errors. + // Upon completion or error, the last time at which the processor + // finished shall be emitted in addition to any errors. Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) + // Close reaps all of the underlying system resources associated with + // this processor. + Close() } -// CompactionProcessor combines sparse values in the database together such -// that at least MinimumGroupSize-sized chunks are grouped together. +// CompactionProcessor combines sparse values in the database together such that +// at least MinimumGroupSize-sized chunks are grouped together. It implements +// the Processor interface. type CompactionProcessor struct { maximumMutationPoolBatch int minimumGroupSize int - // signature is the byte representation of the CompactionProcessor's settings, - // used for purely memoization purposes across an instance. + // signature is the byte representation of the CompactionProcessor's + // settings, used for purely memoization purposes across an instance. signature []byte dtoSampleKeys *dtoSampleKeyList sampleKeys *sampleKeyList } +// Name implements the Processor interface. It returns +// "io.prometheus.CompactionProcessorDefinition". func (p *CompactionProcessor) Name() string { return "io.prometheus.CompactionProcessorDefinition" } +// Signature implements the Processor interface. func (p *CompactionProcessor) Signature() []byte { if len(p.signature) == 0 { out, err := proto.Marshal(&dto.CompactionProcessorDefinition{ @@ -82,8 +90,9 @@ func (p *CompactionProcessor) String() string { return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize) } +// Apply implements the Processor interface. func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) { - var pendingBatch raw.Batch = nil + var pendingBatch raw.Batch defer func() { if pendingBatch != nil { @@ -125,7 +134,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers // block would prevent us from going into unsafe territory. case len(unactedSamples) == 0: if !sampleIterator.Next() { - return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") + return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation") } keyDropped = false @@ -163,7 +172,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize: if !keyDropped { - k := new(dto.SampleKey) + k := &dto.SampleKey{} sampleKey.Dump(k) pendingBatch.Drop(k) @@ -176,10 +185,10 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers // If the number of pending writes equals the target group size case len(pendingSamples) == p.minimumGroupSize: - k := new(dto.SampleKey) + k := &dto.SampleKey{} newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey.Dump(k) - b := new(dto.SampleValueSeries) + b := &dto.SampleValueSeries{} pendingSamples.dump(b) pendingBatch.Put(k, b) @@ -205,7 +214,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize: if !keyDropped { - k := new(dto.SampleKey) + k := &dto.SampleKey{} sampleKey.Dump(k) pendingBatch.Drop(k) keyDropped = true @@ -220,16 +229,16 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers } pendingMutations++ default: - err = fmt.Errorf("Unhandled processing case.") + err = fmt.Errorf("unhandled processing case") } } if len(unactedSamples) > 0 || len(pendingSamples) > 0 { pendingSamples = append(pendingSamples, unactedSamples...) - k := new(dto.SampleKey) + k := &dto.SampleKey{} newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey.Dump(k) - b := new(dto.SampleValueSeries) + b := &dto.SampleValueSeries{} pendingSamples.dump(b) pendingBatch.Put(k, b) pendingSamples = Values{} @@ -249,11 +258,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers return } +// Close implements the Processor interface. func (p *CompactionProcessor) Close() { p.dtoSampleKeys.Close() p.sampleKeys.Close() } +// CompactionProcessorOptions are used for connstruction of a +// CompactionProcessor. type CompactionProcessorOptions struct { // MaximumMutationPoolBatch represents approximately the largest pending // batch of mutation operations for the database before pausing to @@ -266,6 +278,7 @@ type CompactionProcessorOptions struct { MinimumGroupSize int } +// NewCompactionProcessor returns a CompactionProcessor ready to use. func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor { return &CompactionProcessor{ maximumMutationPoolBatch: o.MaximumMutationPoolBatch, @@ -276,7 +289,8 @@ func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor } } -// DeletionProcessor deletes sample blocks older than a defined value. +// DeletionProcessor deletes sample blocks older than a defined value. It +// implements the Processor interface. type DeletionProcessor struct { maximumMutationPoolBatch int // signature is the byte representation of the DeletionProcessor's settings, @@ -287,10 +301,13 @@ type DeletionProcessor struct { sampleKeys *sampleKeyList } +// Name implements the Processor interface. It returns +// "io.prometheus.DeletionProcessorDefinition". func (p *DeletionProcessor) Name() string { return "io.prometheus.DeletionProcessorDefinition" } +// Signature implements the Processor interface. func (p *DeletionProcessor) Signature() []byte { if len(p.signature) == 0 { out, err := proto.Marshal(&dto.DeletionProcessorDefinition{}) @@ -309,8 +326,9 @@ func (p *DeletionProcessor) String() string { return "deletionProcessor" } +// Apply implements the Processor interface. func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) { - var pendingBatch raw.Batch = nil + var pendingBatch raw.Batch defer func() { if pendingBatch != nil { @@ -342,12 +360,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis case pendingBatch == nil: pendingBatch = leveldb.NewBatch() - // If there are no sample values to extract from the datastore, let's - // continue extracting more values to use. We know that the time.Before() - // block would prevent us from going into unsafe territory. + // If there are no sample values to extract from the datastore, + // let's continue extracting more values to use. We know that + // the time.Before() block would prevent us from going into + // unsafe territory. case len(sampleValues) == 0: if !sampleIterator.Next() { - return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") + return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation") } if err = sampleIterator.Key(sampleKeyDto); err != nil { @@ -360,9 +379,9 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis return } - // If the number of pending mutations exceeds the allowed batch amount, - // commit to disk and delete the batch. A new one will be recreated if - // necessary. + // If the number of pending mutations exceeds the allowed batch + // amount, commit to disk and delete the batch. A new one will + // be recreated if necessary. case pendingMutations >= p.maximumMutationPoolBatch: err = samplesPersistence.Commit(pendingBatch) if err != nil { @@ -403,7 +422,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis } default: - err = fmt.Errorf("Unhandled processing case.") + err = fmt.Errorf("unhandled processing case") } } @@ -419,11 +438,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis return } +// Close implements the Processor interface. func (p *DeletionProcessor) Close() { p.dtoSampleKeys.Close() p.sampleKeys.Close() } +// DeletionProcessorOptions are used for connstruction of a DeletionProcessor. type DeletionProcessorOptions struct { // MaximumMutationPoolBatch represents approximately the largest pending // batch of mutation operations for the database before pausing to @@ -431,6 +452,7 @@ type DeletionProcessorOptions struct { MaximumMutationPoolBatch int } +// NewDeletionProcessor returns a DeletionProcessor ready to use. func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor { return &DeletionProcessor{ maximumMutationPoolBatch: o.MaximumMutationPoolBatch, diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 8fc014873..70338051f 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -112,7 +112,7 @@ func (s sampleGroup) Get() (key, value proto.Message) { return k, v } -type noopUpdater bool +type noopUpdater struct{} func (noopUpdater) UpdateCurationState(*CurationState) {} @@ -876,7 +876,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { } defer samples.Close() - updates := new(noopUpdater) + updates := &noopUpdater{} stop := make(chan bool) defer close(stop) @@ -891,7 +891,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { t.Fatal(err) } - iterator, err := curatorStates.p.NewIterator(true) + iterator, err := curatorStates.LevelDBPersistence.NewIterator(true) if err != nil { t.Fatal(err) } @@ -909,12 +909,12 @@ func TestCuratorCompactionProcessor(t *testing.T) { } } - curationKeyDto := new(dto.CurationKey) + curationKeyDto := &dto.CurationKey{} err = iterator.Key(curationKeyDto) if err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } - actualKey := new(curationKey) + actualKey := &curationKey{} actualKey.load(curationKeyDto) actualValue, present, err := curatorStates.Get(actualKey) @@ -988,7 +988,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { for k, actualValue := range sampleValues { if expected.values[k].Value != actualValue.Value { - t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) + t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value) } if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) @@ -1405,7 +1405,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { } defer samples.Close() - updates := new(noopUpdater) + updates := &noopUpdater{} stop := make(chan bool) defer close(stop) @@ -1420,7 +1420,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { t.Fatal(err) } - iterator, err := curatorStates.p.NewIterator(true) + iterator, err := curatorStates.LevelDBPersistence.NewIterator(true) if err != nil { t.Fatal(err) } @@ -1438,12 +1438,12 @@ func TestCuratorDeletionProcessor(t *testing.T) { } } - curationKeyDto := new(dto.CurationKey) + curationKeyDto := &dto.CurationKey{} if err := iterator.Key(curationKeyDto); err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } - actualKey := new(curationKey) + actualKey := &curationKey{} actualKey.load(curationKeyDto) signature := expected.processor.Signature() @@ -1518,7 +1518,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { for k, actualValue := range sampleValues { if expected.values[k].Value != actualValue.Value { - t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) + t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value) } if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 70bf3a5d4..e6799c38b 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -22,7 +22,7 @@ import ( "github.com/prometheus/prometheus/utility/test" ) -func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { +func GetValueAtTimeTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), t test.Tester) { type value struct { year int month time.Month @@ -355,7 +355,7 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer } } -func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) { +func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) { type value struct { year int month time.Month @@ -854,7 +854,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer } if actualValues == nil && len(expectedValues) != 0 { - t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, expectedValues, actualValues) + t.Fatalf("%d.%d(%s). Expected %v but got: %v\n", i, j, behavior.name, expectedValues, actualValues) } if expectedValues == nil { @@ -899,7 +899,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer // Test Definitions Follow func testMemoryGetValueAtTime(t test.Tester) { - persistenceMaker := func() (MetricPersistence, test.Closer) { + persistenceMaker := func() (ViewableMetricPersistence, test.Closer) { return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } @@ -927,7 +927,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) { } func testMemoryGetRangeValues(t test.Tester) { - persistenceMaker := func() (MetricPersistence, test.Closer) { + persistenceMaker := func() (ViewableMetricPersistence, test.Closer) { return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } @@ -935,7 +935,7 @@ func testMemoryGetRangeValues(t test.Tester) { } func testMemoryGetBoundaryValues(t test.Tester) { - persistenceMaker := func() (MetricPersistence, test.Closer) { + persistenceMaker := func() (ViewableMetricPersistence, test.Closer) { return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } diff --git a/storage/metric/sample.go b/storage/metric/sample.go index 49fae0a62..d0afd9e0e 100644 --- a/storage/metric/sample.go +++ b/storage/metric/sample.go @@ -25,15 +25,19 @@ import ( dto "github.com/prometheus/prometheus/model/generated" ) +// MarshalJSON implements json.Marshaler. func (s SamplePair) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil } +// SamplePair pairs a SampleValue with a Timestamp. type SamplePair struct { Value clientmodel.SampleValue Timestamp clientmodel.Timestamp } +// Equal returns true if this SamplePair and o have equal Values and equal +// Timestamps. func (s *SamplePair) Equal(o *SamplePair) bool { if s == o { return true @@ -54,20 +58,27 @@ func (s *SamplePair) String() string { return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value) } +// Values is a sortable slice of SamplePair pointers (as in: it implements +// sort.Interface). Sorting happens by Timestamp. type Values []*SamplePair +// Len implements sort.Interface. func (v Values) Len() int { return len(v) } +// Less implements sort.Interface. func (v Values) Less(i, j int) bool { return v[i].Timestamp.Before(v[j].Timestamp) } +// Swap implements sort.Interface. func (v Values) Swap(i, j int) { v[i], v[j] = v[j], v[i] } +// Equal returns true if these Values are of the same length as o, and each +// value is equal to the corresponding value in o (i.e. at the same index). func (v Values) Equal(o Values) bool { if len(v) != len(o) { return false @@ -132,6 +143,7 @@ func (v Values) dump(d *dto.SampleValueSeries) { } } +// ToSampleKey returns the SampleKey for these Values. func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey { return &SampleKey{ Fingerprint: f, @@ -156,9 +168,10 @@ func (v Values) String() string { return buffer.String() } +// NewValuesFromDTO deserializes Values from a DTO. func NewValuesFromDTO(d *dto.SampleValueSeries) Values { - // BUG(matt): Incogruent from the other load/dump API types, but much more - // performant. + // BUG(matt): Incogruent from the other load/dump API types, but much + // more performant. v := make(Values, 0, len(d.Value)) for _, value := range d.Value { @@ -171,11 +184,13 @@ func NewValuesFromDTO(d *dto.SampleValueSeries) Values { return v } +// SampleSet is Values with a Metric attached. type SampleSet struct { Metric clientmodel.Metric Values Values } +// Interval describes the inclusive interval between two Timestamps. type Interval struct { OldestInclusive clientmodel.Timestamp NewestInclusive clientmodel.Timestamp diff --git a/storage/metric/samplekey.go b/storage/metric/samplekey.go index 92aa88b3e..dd381b437 100644 --- a/storage/metric/samplekey.go +++ b/storage/metric/samplekey.go @@ -49,6 +49,8 @@ func (s *SampleKey) Constrain(first, last *SampleKey) bool { } } +// Equal returns true if this SampleKey and o have equal fingerprints, +// timestamps, and sample counts. func (s *SampleKey) Equal(o *SampleKey) bool { if s == o { return true @@ -81,6 +83,11 @@ func (s *SampleKey) MayContain(t clientmodel.Timestamp) bool { } } +// Before returns true if the Fingerprint of this SampleKey is less than fp and +// false if it is greater. If both fingerprints are equal, the FirstTimestamp of +// this SampleKey is checked in the same way against t. If the timestamps are +// eqal, the LastTimestamp of this SampleKey is checked against t (and false is +// returned if they are equal again). func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) bool { if s.Fingerprint.Less(fp) { return true @@ -96,7 +103,7 @@ func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) return s.LastTimestamp.Before(t) } -// ToDTO converts this SampleKey into a DTO for use in serialization purposes. +// Dump converts this SampleKey into a DTO for use in serialization purposes. func (s *SampleKey) Dump(d *dto.SampleKey) { d.Reset() fp := &dto.Fingerprint{} @@ -112,6 +119,7 @@ func (s *SampleKey) String() string { return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount) } +// Load deserializes this SampleKey from a DTO. func (s *SampleKey) Load(d *dto.SampleKey) { f := &clientmodel.Fingerprint{} loadFingerprint(f, d.GetFingerprint()) diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 71b7e7c8d..d396190fe 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -422,8 +422,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t for i := 0; i < numberOfRangeScans; i++ { timestamps := metricTimestamps[metricIndex] - var first int64 = 0 - var second int64 = 0 + var first int64 + var second int64 for { firstCandidate := random.Int63n(int64(len(timestamps))) @@ -472,6 +472,11 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t fp := &clientmodel.Fingerprint{} fp.LoadFromMetric(metric) switch persistence := p.(type) { + case View: + samples = persistence.GetRangeValues(fp, interval) + if len(samples) < 2 { + t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples)) + } case *LevelDBMetricPersistence: var err error samples, err = levelDBGetRangeValues(persistence, fp, interval) @@ -482,10 +487,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples)) } default: - samples = p.GetRangeValues(fp, interval) - if len(samples) < 2 { - t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples)) - } + t.Error("Unexpected type of MetricPersistence.") } } } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 9be366044..44638b04d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -114,10 +114,11 @@ const ( const watermarkCacheLimit = 1024 * 1024 +// NewTieredStorage returns a TieredStorage object ready to use. func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) { if isDir, _ := utility.IsDir(rootDirectory); !isDir { if err := os.MkdirAll(rootDirectory, 0755); err != nil { - return nil, fmt.Errorf("Could not find or create metrics directory %s: %s", rootDirectory, err) + return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err) } } @@ -160,12 +161,12 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn return s, nil } -// Enqueues Samples for storage. +// AppendSamples enqueues Samples for storage. func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) { t.mu.RLock() defer t.mu.RUnlock() if t.state != tieredStorageServing { - return fmt.Errorf("Storage is not serving.") + return fmt.Errorf("storage is not serving") } t.memoryArena.AppendSamples(samples) @@ -174,7 +175,7 @@ func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) { return } -// Stops the storage subsystem, flushing all pending operations. +// Drain stops the storage subsystem, flushing all pending operations. func (t *TieredStorage) Drain(drained chan<- bool) { t.mu.Lock() defer t.mu.Unlock() @@ -193,20 +194,22 @@ func (t *TieredStorage) drain(drained chan<- bool) { t.draining <- (drained) } -// Materializes a View according to a ViewRequestBuilder, subject to a timeout. +// MakeView materializes a View according to a ViewRequestBuilder, subject to a +// timeout. func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) { t.mu.RLock() defer t.mu.RUnlock() if t.state != tieredStorageServing { - return nil, fmt.Errorf("Storage is not serving") + return nil, fmt.Errorf("storage is not serving") } - // The result channel needs a one-element buffer in case we have timed out in - // MakeView, but the view rendering still completes afterwards and writes to - // the channel. + // The result channel needs a one-element buffer in case we have timed + // out in MakeView, but the view rendering still completes afterwards + // and writes to the channel. result := make(chan View, 1) - // The abort channel needs a one-element buffer in case the view rendering - // has already exited and doesn't consume from the channel anymore. + // The abort channel needs a one-element buffer in case the view + // rendering has already exited and doesn't consume from the channel + // anymore. abortChan := make(chan bool, 1) errChan := make(chan error) queryStats.GetTimer(stats.ViewQueueTime).Start() @@ -225,11 +228,11 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat return nil, err case <-time.After(deadline): abortChan <- true - return nil, fmt.Errorf("MakeView timed out after %s.", deadline) + return nil, fmt.Errorf("fetching query data timed out after %s", deadline) } } -// Starts serving requests. +// Serve starts serving requests. func (t *TieredStorage) Serve(started chan<- bool) { t.mu.Lock() if t.state != tieredStorageStarting { @@ -284,6 +287,7 @@ func (t *TieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue))) } +// Flush flushes all samples to disk. func (t *TieredStorage) Flush() { t.flushSema <- true t.flushMemory(0) @@ -311,6 +315,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) { glog.Info("Done flushing.") } +// Close stops serving, flushes all pending operations, and frees all resources. func (t *TieredStorage) Close() { t.mu.Lock() defer t.mu.Unlock() @@ -329,8 +334,8 @@ func (t *TieredStorage) close() { t.memoryArena.Close() t.DiskStorage.Close() - // BUG(matt): There is a probability that pending items may hang here and not - // get flushed. + // BUG(matt): There is a probability that pending items may hang here + // and not get flushed. close(t.appendToDiskQueue) close(t.ViewQueue) t.wmCache.Clear() @@ -639,7 +644,8 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri panic("illegal state: violated sort invariant") } -// Get all label values that are associated with the provided label name. +// GetAllValuesForLabel gets all label values that are associated with the +// provided label name. func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -669,8 +675,8 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (c return values, nil } -// Get all of the metric fingerprints that are associated with the provided -// label set. +// GetFingerprintsForLabelSet gets all of the metric fingerprints that are +// associated with the provided label set. func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (clientmodel.Fingerprints, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -700,7 +706,8 @@ func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet return fingerprints, nil } -// Get the metric associated with the provided fingerprint. +// GetMetricForFingerprint gets the metric associated with the provided +// fingerprint. func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) { t.mu.RLock() defer t.mu.RUnlock() diff --git a/storage/metric/view.go b/storage/metric/view.go index f1f0654a8..6311991ed 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -27,8 +27,9 @@ var ( lastSupertime = []byte{127, 255, 255, 255, 255, 255, 255, 255} ) -// Represents the summation of all datastore queries that shall be performed to -// extract values. Each operation mutates the state of the builder. +// ViewRequestBuilder represents the summation of all datastore queries that +// shall be performed to extract values. Each operation mutates the state of +// the builder. type ViewRequestBuilder interface { GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) @@ -36,12 +37,13 @@ type ViewRequestBuilder interface { ScanJobs() scanJobs } -// Contains the various unoptimized requests for data. +// viewRequestBuilder contains the various unoptimized requests for data. type viewRequestBuilder struct { operations map[clientmodel.Fingerprint]ops } -// Furnishes a ViewRequestBuilder for remarking what types of queries to perform. +// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types +// of queries to perform. func NewViewRequestBuilder() *viewRequestBuilder { return &viewRequestBuilder{ operations: make(map[clientmodel.Fingerprint]ops), @@ -50,8 +52,8 @@ func NewViewRequestBuilder() *viewRequestBuilder { var getValuesAtTimes = newValueAtTimeList(10 * 1024) -// Gets for the given Fingerprint either the value at that time if there is an -// match or the one or two values adjacent thereto. +// GetMetricAtTime gets for the given Fingerprint either the value at that time +// if there is an match or the one or two values adjacent thereto. func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) { ops := v.operations[*fingerprint] op, _ := getValuesAtTimes.Get() @@ -62,9 +64,9 @@ func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprin var getValuesAtIntervals = newValueAtIntervalList(10 * 1024) -// Gets for the given Fingerprint either the value at that interval from From -// through Through if there is an match or the one or two values adjacent -// for each point. +// GetMetricAtInterval gets for the given Fingerprint either the value at that +// interval from From through Through if there is an match or the one or two +// values adjacent for each point. func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) { ops := v.operations[*fingerprint] op, _ := getValuesAtIntervals.Get() @@ -77,8 +79,8 @@ func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Finger var getValuesAlongRanges = newValueAlongRangeList(10 * 1024) -// Gets for the given Fingerprint the values that occur inclusively from From -// through Through. +// GetMetricRange gets for the given Fingerprint the values that occur +// inclusively from From through Through. func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) { ops := v.operations[*fingerprint] op, _ := getValuesAlongRanges.Get() @@ -90,7 +92,8 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024) -// Gets value ranges at intervals for the given Fingerprint: +// GetMetricRangeAtInterval gets value ranges at intervals for the given +// Fingerprint: // // |----| |----| |----| |----| // ^ ^ ^ ^ ^ ^ @@ -108,7 +111,7 @@ func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.F v.operations[*fingerprint] = ops } -// Emits the optimized scans that will occur in the data store. This +// ScanJobs emits the optimized scans that will occur in the data store. This // effectively resets the ViewRequestBuilder back to a pristine state. func (v *viewRequestBuilder) ScanJobs() (j scanJobs) { for fingerprint, operations := range v.operations { diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index 6724e046b..fd45b28dd 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -17,8 +17,6 @@ import ( "code.google.com/p/goprotobuf/proto" clientmodel "github.com/prometheus/client_golang/model" - - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" @@ -40,27 +38,32 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) { d.Timestamp = proto.Int64(w.High.Unix()) } +// A FingerprintHighWatermarkMapping is used for batch updates of many high +// watermarks in a database. type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]clientmodel.Timestamp +// HighWatermarker models a high-watermark database. type HighWatermarker interface { + raw.Database raw.ForEacher raw.Pruner UpdateBatch(FingerprintHighWatermarkMapping) error Get(*clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) - State() *raw.DatabaseState - Size() (uint64, bool, error) } +// LevelDBHighWatermarker is an implementation of HighWatermarker backed by +// leveldb. type LevelDBHighWatermarker struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } +// Get implements HighWatermarker. func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) { - k := new(dto.Fingerprint) + k := &dto.Fingerprint{} dumpFingerprint(k, f) - v := new(dto.MetricHighWatermark) - ok, err = w.p.Get(k, v) + v := &dto.MetricHighWatermark{} + ok, err = w.LevelDBPersistence.Get(k, v) if err != nil { return t, ok, err } @@ -71,6 +74,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel. return t, true, nil } +// UpdateBatch implements HighWatermarker. func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { batch := leveldb.NewBatch() defer batch.Close() @@ -80,9 +84,9 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) if err != nil { return err } - k := new(dto.Fingerprint) + k := &dto.Fingerprint{} dumpFingerprint(k, &fp) - v := new(dto.MetricHighWatermark) + v := &dto.MetricHighWatermark{} if !present { v.Timestamp = proto.Int64(t.Unix()) batch.Put(k, v) @@ -97,36 +101,15 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) } } - return w.p.Commit(batch) -} - -func (w *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { - return w.p.ForEach(d, f, o) -} - -func (w *LevelDBHighWatermarker) Prune() (bool, error) { - w.p.Prune() - - return false, nil -} - -func (w *LevelDBHighWatermarker) Close() { - w.p.Close() -} - -func (w *LevelDBHighWatermarker) State() *raw.DatabaseState { - return w.p.State() -} - -func (w *LevelDBHighWatermarker) Size() (uint64, bool, error) { - s, err := w.p.Size() - return s, true, err + return w.LevelDBPersistence.Commit(batch) } +// LevelDBHighWatermarkerOptions just wraps leveldb.LevelDBOptions. type LevelDBHighWatermarkerOptions struct { leveldb.LevelDBOptions } +// NewLevelDBHighWatermarker returns a LevelDBHighWatermarker ready to use. func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -134,52 +117,37 @@ func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWat } return &LevelDBHighWatermarker{ - p: s, + LevelDBPersistence: s, }, nil } +// CurationRemarker models a curation remarker database. type CurationRemarker interface { + raw.Database raw.Pruner Update(*curationKey, clientmodel.Timestamp) error Get(*curationKey) (t clientmodel.Timestamp, ok bool, err error) - State() *raw.DatabaseState - Size() (uint64, bool, error) } +// LevelDBCurationRemarker is an implementation of CurationRemarker backed by +// leveldb. type LevelDBCurationRemarker struct { - p *leveldb.LevelDBPersistence + *leveldb.LevelDBPersistence } +// LevelDBCurationRemarkerOptions just wraps leveldb.LevelDBOptions. type LevelDBCurationRemarkerOptions struct { leveldb.LevelDBOptions } -func (w *LevelDBCurationRemarker) State() *raw.DatabaseState { - return w.p.State() -} - -func (w *LevelDBCurationRemarker) Size() (uint64, bool, error) { - s, err := w.p.Size() - return s, true, err -} - -func (w *LevelDBCurationRemarker) Close() { - w.p.Close() -} - -func (w *LevelDBCurationRemarker) Prune() (bool, error) { - w.p.Prune() - - return false, nil -} - +// Get implements CurationRemarker. func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, ok bool, err error) { - k := new(dto.CurationKey) + k := &dto.CurationKey{} c.dump(k) - v := new(dto.CurationValue) + v := &dto.CurationValue{} - ok, err = w.p.Get(k, v) + ok, err = w.LevelDBPersistence.Get(k, v) if err != nil || !ok { return clientmodel.TimestampFromUnix(0), ok, err } @@ -187,15 +155,17 @@ func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, return clientmodel.TimestampFromUnix(v.GetLastCompletionTimestamp()), true, nil } +// Update implements CurationRemarker. func (w *LevelDBCurationRemarker) Update(pair *curationKey, t clientmodel.Timestamp) error { - k := new(dto.CurationKey) + k := &dto.CurationKey{} pair.dump(k) - return w.p.Put(k, &dto.CurationValue{ + return w.LevelDBPersistence.Put(k, &dto.CurationValue{ LastCompletionTimestamp: proto.Int64(t.Unix()), }) } +// NewLevelDBCurationRemarker returns a LevelDBCurationRemarker ready to use. func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) if err != nil { @@ -203,7 +173,7 @@ func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurat } return &LevelDBCurationRemarker{ - p: s, + LevelDBPersistence: s, }, nil } diff --git a/storage/raw/index/interface.go b/storage/raw/index/interface.go deleted file mode 100644 index 5a87fb264..000000000 --- a/storage/raw/index/interface.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package index - -import "code.google.com/p/goprotobuf/proto" - -type MembershipIndex interface { - Has(key proto.Message) (bool, error) - Put(key proto.Message) error - Drop(key proto.Message) error - Close() -} diff --git a/storage/raw/index/leveldb/interface_test.go b/storage/raw/index/leveldb/interface_test.go deleted file mode 100644 index 99959d02d..000000000 --- a/storage/raw/index/leveldb/interface_test.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "github.com/prometheus/prometheus/storage/raw/index" - "testing" -) - -func TestInterfaceAdherence(t *testing.T) { - var _ index.MembershipIndex = &LevelDBMembershipIndex{} -} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go deleted file mode 100644 index 0c443909a..000000000 --- a/storage/raw/index/leveldb/leveldb.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "code.google.com/p/goprotobuf/proto" - - "github.com/prometheus/prometheus/storage/raw" - "github.com/prometheus/prometheus/storage/raw/leveldb" - - dto "github.com/prometheus/prometheus/model/generated" -) - -var existenceValue = new(dto.MembershipIndexValue) - -type LevelDBMembershipIndex struct { - persistence *leveldb.LevelDBPersistence -} - -func (l *LevelDBMembershipIndex) Close() { - l.persistence.Close() -} - -func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) { - return l.persistence.Has(k) -} - -func (l *LevelDBMembershipIndex) Drop(k proto.Message) error { - return l.persistence.Drop(k) -} - -func (l *LevelDBMembershipIndex) Put(k proto.Message) error { - return l.persistence.Put(k, existenceValue) -} - -type LevelDBIndexOptions struct { - leveldb.LevelDBOptions -} - -func NewLevelDBMembershipIndex(o LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) { - leveldbPersistence, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) - if err != nil { - return nil, err - } - - return &LevelDBMembershipIndex{ - persistence: leveldbPersistence, - }, nil -} - -func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error { - return l.persistence.Commit(batch) -} - -// CompactKeyspace compacts the entire database's keyspace. -// -// Beware that it would probably be imprudent to run this on a live user-facing -// server due to latency implications. -func (l *LevelDBMembershipIndex) Prune() { - l.persistence.Prune() -} - -func (l *LevelDBMembershipIndex) Size() (uint64, error) { - return l.persistence.Size() -} - -func (l *LevelDBMembershipIndex) State() *raw.DatabaseState { - return l.persistence.State() -} diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 1f8bbe952..30655d2cf 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -19,9 +19,25 @@ import ( "github.com/prometheus/prometheus/storage" ) +// Database provides a few very basic methods to manage a database and inquire +// its state. +type Database interface { + // Close reaps all of the underlying system resources associated with + // this database. For databases that don't need that kind of clean-up, + // it is implemented as a no-op (so that clients don't need to reason + // and always call Close 'just in case'). + Close() error + // State reports the state of the database as a DatabaseState object. + State() *DatabaseState + // Size returns the total size of the database in bytes. The number may + // be an approximation, depending on the underlying database type. + Size() (uint64, error) +} + +// ForEacher is implemented by databases that can be iterated through. type ForEacher interface { - // ForEach is responsible for iterating through all records in the database - // until one of the following conditions are met: + // ForEach is responsible for iterating through all records in the + // database until one of the following conditions are met: // // 1.) A system anomaly in the database scan. // 2.) The last record in the database is reached. @@ -31,18 +47,21 @@ type ForEacher interface { ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) } +// Pruner is implemented by a database that can be pruned in some way. +type Pruner interface { + Prune() +} + // Persistence models a key-value store for bytes that supports various // additional operations. type Persistence interface { + Database ForEacher - // Close reaps all of the underlying system resources associated with this - // persistence. - Close() error // Has informs the user whether a given key exists in the database. Has(key proto.Message) (bool, error) - // Get retrieves the key from the database if it exists or returns nil if - // it is absent. + // Get populates 'value' with the value of 'key', if present, in which + // case 'present' is returned as true. Get(key, value proto.Message) (present bool, err error) // Drop removes the key from the database. Drop(key proto.Message) error @@ -56,15 +75,11 @@ type Persistence interface { // en masse. The interface implies no protocol around the atomicity of // effectuation. type Batch interface { - // Close reaps all of the underlying system resources associated with this - // batch mutation. + // Close reaps all of the underlying system resources associated with + // this batch mutation. Close() // Put follows the same protocol as Persistence.Put. Put(key, value proto.Message) // Drop follows the same protocol as Persistence.Drop. Drop(key proto.Message) } - -type Pruner interface { - Prune() (noop bool, err error) -} diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index 87fe89f7f..d5dafa961 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -26,6 +26,7 @@ type batch struct { puts uint32 } +// NewBatch returns a fully allocated batch object. func NewBatch() *batch { return &batch{ batch: levigo.NewWriteBatch(), diff --git a/storage/raw/leveldb/interface_test.go b/storage/raw/leveldb/interface_test.go index 013101fbf..4e1f5a44f 100644 --- a/storage/raw/leveldb/interface_test.go +++ b/storage/raw/leveldb/interface_test.go @@ -20,5 +20,5 @@ import ( ) func TestInterfaceAdherence(t *testing.T) { - var _ raw.Persistence = new(LevelDBPersistence) + var _ raw.Persistence = &LevelDBPersistence{} } diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go index 9c9b25a22..0992536c1 100644 --- a/storage/raw/leveldb/iterator.go +++ b/storage/raw/leveldb/iterator.go @@ -20,6 +20,7 @@ import ( // TODO: Evaluate whether to use coding.Encoder for the key and values instead // raw bytes for consistency reasons. +// Iterator provides method to iterate through a leveldb. type Iterator interface { Error() error Valid() bool diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 941c6674f..95db1f9a4 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -24,7 +24,8 @@ import ( "github.com/prometheus/prometheus/storage/raw" ) -// LevelDBPersistence is a disk-backed sorted key-value store. +// LevelDBPersistence is a disk-backed sorted key-value store. It implements the +// interfaces raw.Database, raw.ForEacher, raw.Pruner, raw.Persistence. type LevelDBPersistence struct { path string name string @@ -43,23 +44,24 @@ type LevelDBPersistence struct { type levigoIterator struct { // iterator is the receiver of most proxied operation calls. iterator *levigo.Iterator - // readOptions is only set if the iterator is a snapshot of an underlying - // database. This signals that it needs to be explicitly reaped upon the - // end of this iterator's life. + // readOptions is only set if the iterator is a snapshot of an + // underlying database. This signals that it needs to be explicitly + // reaped upon the end of this iterator's life. readOptions *levigo.ReadOptions // snapshot is only set if the iterator is a snapshot of an underlying - // database. This signals that it needs to be explicitly reaped upon the - // end of this this iterator's life. + // database. This signals that it needs to be explicitly reaped upon + // the end of this this iterator's life. snapshot *levigo.Snapshot // storage is only set if the iterator is a snapshot of an underlying - // database. This signals that it needs to be explicitly reaped upon the - // end of this this iterator's life. The snapshot must be freed in the - // context of an actual database. + // database. This signals that it needs to be explicitly reaped upon + // the end of this this iterator's life. The snapshot must be freed in + // the context of an actual database. storage *levigo.DB // closed indicates whether the iterator has been closed before. closed bool - // valid indicates whether the iterator may be used. If a LevelDB iterator - // ever becomes invalid, it must be disposed of and cannot be reused. + // valid indicates whether the iterator may be used. If a LevelDB + // iterator ever becomes invalid, it must be disposed of and cannot be + // reused. valid bool // creationTime provides the time at which the iterator was made. creationTime time.Time @@ -191,13 +193,16 @@ func (i *levigoIterator) Valid() bool { return i.valid } +// Compression defines the compression mode. type Compression uint +// Possible compression modes. const ( Snappy Compression = iota Uncompressed ) +// LevelDBOptions bundles options needed to create a LevelDBPersistence object. type LevelDBOptions struct { Path string Name string @@ -212,6 +217,8 @@ type LevelDBOptions struct { Compression Compression } +// NewLevelDBPersistence returns an initialized LevelDBPersistence object, +// created with the given options. func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) { options := levigo.NewOptions() options.SetCreateIfMissing(true) @@ -257,9 +264,10 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) { }, nil } +// Close implements raw.Persistence (and raw.Database). func (l *LevelDBPersistence) Close() error { - // These are deferred to take advantage of forced closing in case of stack - // unwinding due to anomalies. + // These are deferred to take advantage of forced closing in case of + // stack unwinding due to anomalies. defer func() { if l.filterPolicy != nil { l.filterPolicy.Close() @@ -299,6 +307,7 @@ func (l *LevelDBPersistence) Close() error { return nil } +// Get implements raw.Persistence. func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { buf, _ := buffers.Get() defer buffers.Give(buf) @@ -328,10 +337,12 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { return true, nil } +// Has implements raw.Persistence. func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) { return l.Get(k, nil) } +// Drop implements raw.Persistence. func (l *LevelDBPersistence) Drop(k proto.Message) error { buf, _ := buffers.Get() defer buffers.Give(buf) @@ -343,29 +354,31 @@ func (l *LevelDBPersistence) Drop(k proto.Message) error { return l.storage.Delete(l.writeOptions, buf.Bytes()) } -func (l *LevelDBPersistence) Put(key, value proto.Message) error { +// Put implements raw.Persistence. +func (l *LevelDBPersistence) Put(k, v proto.Message) error { keyBuf, _ := buffers.Get() defer buffers.Give(keyBuf) - if err := keyBuf.Marshal(key); err != nil { + if err := keyBuf.Marshal(k); err != nil { panic(err) } valBuf, _ := buffers.Get() defer buffers.Give(valBuf) - if err := valBuf.Marshal(value); err != nil { + if err := valBuf.Marshal(v); err != nil { panic(err) } return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes()) } +// Commit implements raw.Persistence. func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { - // XXX: This is a wart to clean up later. Ideally, after doing extensive - // tests, we could create a Batch struct that journals pending - // operations which the given Persistence implementation could convert - // to its specific commit requirements. + // XXX: This is a wart to clean up later. Ideally, after doing + // extensive tests, we could create a Batch struct that journals pending + // operations which the given Persistence implementation could convert + // to its specific commit requirements. batch, ok := b.(*batch) if !ok { panic("leveldb.batch expected") @@ -374,7 +387,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { return l.storage.Write(l.writeOptions, batch.batch) } -// CompactKeyspace compacts the entire database's keyspace. +// Prune implements raw.Pruner. It compacts the entire keyspace of the database. // // Beware that it would probably be imprudent to run this on a live user-facing // server due to latency implications. @@ -389,6 +402,8 @@ func (l *LevelDBPersistence) Prune() { l.storage.CompactRange(keyspace) } +// Size returns the approximate size the entire database takes on disk (in +// bytes). It implements the raw.Database interface. func (l *LevelDBPersistence) Size() (uint64, error) { iterator, err := l.NewIterator(false) if err != nil { @@ -459,6 +474,7 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) { }, nil } +// ForEach implements raw.ForEacher. func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { iterator, err := l.NewIterator(true) if err != nil { @@ -482,11 +498,11 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora } switch filter.Filter(decodedKey, decodedValue) { - case storage.STOP: + case storage.Stop: return - case storage.SKIP: + case storage.Skip: continue - case storage.ACCEPT: + case storage.Accept: opErr := operator.Operate(decodedKey, decodedValue) if opErr != nil { if opErr.Continuable { diff --git a/storage/raw/leveldb/state.go b/storage/raw/leveldb/state.go index 3e78830ef..13befe720 100644 --- a/storage/raw/leveldb/state.go +++ b/storage/raw/leveldb/state.go @@ -23,6 +23,11 @@ const ( sstablesKey = "leveldb.sstables" ) +// State returns the DatabaseState. It implements the raw.Database interface and +// sets the following Supplemental entries: +// "Low Level": leveldb property value for "leveldb.stats" +// "SSTable": leveldb property value for "leveldb.sstables" +// "Errors": only set if an error has occurred determining the size func (l *LevelDBPersistence) State() *raw.DatabaseState { databaseState := &raw.DatabaseState{ Location: l.path, diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index cee367511..e680050e3 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -23,8 +23,8 @@ import ( const cacheCapacity = 0 type ( - // Pair models a prospective (key, value) double that will be committed to - // a database. + // Pair models a prospective (key, value) double that will be committed + // to a database. Pair interface { Get() (key, value proto.Message) } @@ -32,17 +32,18 @@ type ( // Pairs models a list of Pair for disk committing. Pairs []Pair - // Preparer readies a LevelDB store for a given raw state given the fixtures - // definitions passed into it. + // Preparer readies a LevelDB store for a given raw state given the + // fixtures definitions passed into it. Preparer interface { - // Prepare furnishes the database and returns its path along with any - // encountered anomalies. + // Prepare furnishes the database and returns its path along + // with any encountered anomalies. Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory } + // FixtureFactory is an iterator emitting fixture data. FixtureFactory interface { - // HasNext indicates whether the FixtureFactory has more pending fixture - // data to build. + // HasNext indicates whether the FixtureFactory has more pending + // fixture data to build. HasNext() (has bool) // Next emits the next (key, value) double for storage. Next() (key, value proto.Message) @@ -85,10 +86,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory return } +// HasNext implements FixtureFactory. func (f cassetteFactory) HasNext() bool { return f.index < f.count } +// Next implements FixtureFactory. func (f *cassetteFactory) Next() (key, value proto.Message) { key, value = f.pairs[f.index].Get() diff --git a/storage/raw/state.go b/storage/raw/state.go index b5cd8698c..ff5d0885a 100644 --- a/storage/raw/state.go +++ b/storage/raw/state.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/utility" ) +// DatabaseState contains some fundamental attributes of a database. type DatabaseState struct { Name string @@ -28,12 +29,17 @@ type DatabaseState struct { Supplemental map[string]string } +// DatabaseStates is a sortable slice of DatabaseState pointers. It implements +// sort.Interface. type DatabaseStates []*DatabaseState +// Len implements sort.Interface. func (s DatabaseStates) Len() int { return len(s) } +// Less implements sort.Interface. The primary sorting criterion is the Name, +// the secondary criterion is the Size. func (s DatabaseStates) Less(i, j int) bool { l := s[i] r := s[j] @@ -48,6 +54,7 @@ func (s DatabaseStates) Less(i, j int) bool { return l.Size < r.Size } +// Swap implements sort.Interface. func (s DatabaseStates) Swap(i, j int) { s[i], s[j] = s[j], s[i] } diff --git a/storage/remote/opentsdb/client.go b/storage/remote/opentsdb/client.go index 04894c678..d22aa5099 100644 --- a/storage/remote/opentsdb/client.go +++ b/storage/remote/opentsdb/client.go @@ -17,7 +17,7 @@ import ( const ( putEndpoint = "/api/put" - contentTypeJson = "application/json" + contentTypeJSON = "application/json" ) var ( @@ -30,7 +30,7 @@ type Client struct { httpClient *http.Client } -// Create a new Client. +// NewClient creates a new Client. func NewClient(url string, timeout time.Duration) *Client { return &Client{ url: url, @@ -47,12 +47,13 @@ type StoreSamplesRequest struct { Tags map[string]string `json:"tags"` } -// Escape Prometheus label values to valid tag values for OpenTSDB. +// escapeTagValue escapes Prometheus label values to valid tag values for +// OpenTSDB. func escapeTagValue(l clientmodel.LabelValue) string { return illegalCharsRE.ReplaceAllString(string(l), "_") } -// Translate Prometheus metric into OpenTSDB tags. +// tagsFromMetric translates Prometheus metric into OpenTSDB tags. func tagsFromMetric(m clientmodel.Metric) map[string]string { tags := make(map[string]string, len(m)-1) for l, v := range m { @@ -64,7 +65,7 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string { return tags } -// Send a batch of samples to OpenTSDB via its HTTP API. +// Store sends a batch of samples to OpenTSDB via its HTTP API. func (c *Client) Store(samples clientmodel.Samples) error { reqs := make([]StoreSamplesRequest, 0, len(samples)) for _, s := range samples { @@ -91,7 +92,7 @@ func (c *Client) Store(samples clientmodel.Samples) error { resp, err := c.httpClient.Post( u.String(), - contentTypeJson, + contentTypeJSON, bytes.NewBuffer(buf), ) if err != nil { @@ -116,5 +117,5 @@ func (c *Client) Store(samples clientmodel.Samples) error { if err := json.Unmarshal(buf, &r); err != nil { return err } - return fmt.Errorf("Failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) + return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 0bd2e0c72..32e8b5a48 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -47,7 +47,7 @@ type TSDBQueueManager struct { drained chan bool } -// Build a new TSDBQueueManager. +// NewTSDBQueueManager builds a new TSDBQueueManager. func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { return &TSDBQueueManager{ tsdb: tsdb, @@ -57,8 +57,8 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { } } -// Queue a sample batch to be sent to the TSDB. This drops the most recently -// queued samples on the floor if the queue is full. +// Queue queues a sample batch to be sent to the TSDB. It drops the most +// recently queued samples on the floor if the queue is full. func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { select { case t.queue <- s: @@ -85,13 +85,13 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) { } } -// Report notification queue occupancy and capacity. +// reportQueues reports notification queue occupancy and capacity. func (t *TSDBQueueManager) reportQueues() { queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue))) queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue))) } -// Continuously send samples to the TSDB. +// Run continuously sends samples to the TSDB. func (t *TSDBQueueManager) Run() { defer func() { close(t.drained) @@ -129,7 +129,7 @@ func (t *TSDBQueueManager) Run() { } } -// Flush remaining queued samples. +// Flush flushes remaining queued samples. func (t *TSDBQueueManager) flush() { if len(t.pendingSamples) > 0 { go t.sendSamples(t.pendingSamples) @@ -137,7 +137,8 @@ func (t *TSDBQueueManager) flush() { t.pendingSamples = t.pendingSamples[:0] } -// Stop sending samples to the TSDB and wait for pending sends to complete. +// Close stops sending samples to the TSDB and waits for pending sends to +// complete. func (t *TSDBQueueManager) Close() { glog.Infof("TSDB queue manager shutting down...") close(t.queue)