mirror of https://github.com/prometheus/prometheus
949 lines
26 KiB
Go
949 lines
26 KiB
Go
// Copyright 2013 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.google.com/p/goprotobuf/proto"
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
dto "github.com/prometheus/prometheus/model/generated"
|
|
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
|
|
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
|
"github.com/prometheus/prometheus/utility"
|
|
)
|
|
|
|
const sortConcurrency = 2
|
|
|
|
type LevelDBMetricPersistence struct {
|
|
CurationRemarks *leveldb.LevelDBPersistence
|
|
fingerprintToMetrics *leveldb.LevelDBPersistence
|
|
labelNameToFingerprints *leveldb.LevelDBPersistence
|
|
labelSetToFingerprints *leveldb.LevelDBPersistence
|
|
MetricHighWatermarks *leveldb.LevelDBPersistence
|
|
metricMembershipIndex *index.LevelDBMembershipIndex
|
|
MetricSamples *leveldb.LevelDBPersistence
|
|
}
|
|
|
|
var (
|
|
leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.")
|
|
|
|
// These flag values are back of the envelope, though they seem sensible.
|
|
// Please re-evaluate based on your own needs.
|
|
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).")
|
|
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).")
|
|
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).")
|
|
labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 25*1024*1024, "The size for the label name to metric fingerprint index (bytes).")
|
|
labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size for the label pair to metric fingerprint index (bytes).")
|
|
metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 5*1024*1024, "The size for the metric membership index (bytes).")
|
|
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 50*1024*1024, "The size for the samples database (bytes).")
|
|
)
|
|
|
|
type leveldbOpener func()
|
|
type leveldbCloser interface {
|
|
Close()
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) Close() {
|
|
var persistences = []leveldbCloser{
|
|
l.CurationRemarks,
|
|
l.fingerprintToMetrics,
|
|
l.labelNameToFingerprints,
|
|
l.labelSetToFingerprints,
|
|
l.MetricHighWatermarks,
|
|
l.metricMembershipIndex,
|
|
l.MetricSamples,
|
|
}
|
|
|
|
closerGroup := sync.WaitGroup{}
|
|
|
|
for _, closer := range persistences {
|
|
closerGroup.Add(1)
|
|
go func(closer leveldbCloser) {
|
|
if closer != nil {
|
|
closer.Close()
|
|
}
|
|
closerGroup.Done()
|
|
}(closer)
|
|
}
|
|
|
|
closerGroup.Wait()
|
|
}
|
|
|
|
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
|
|
workers := utility.NewUncertaintyGroup(7)
|
|
|
|
emission := new(LevelDBMetricPersistence)
|
|
|
|
var subsystemOpeners = []struct {
|
|
name string
|
|
opener leveldbOpener
|
|
}{
|
|
{
|
|
"Label Names and Value Pairs by Fingerprint",
|
|
func() {
|
|
var err error
|
|
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"Samples by Fingerprint",
|
|
func() {
|
|
var err error
|
|
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"High Watermarks by Fingerprint",
|
|
func() {
|
|
var err error
|
|
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"Fingerprints by Label Name",
|
|
func() {
|
|
var err error
|
|
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"Fingerprints by Label Name and Value Pair",
|
|
func() {
|
|
var err error
|
|
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"Metric Membership Index",
|
|
func() {
|
|
var err error
|
|
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
{
|
|
"Sample Curation Remarks",
|
|
func() {
|
|
var err error
|
|
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
|
|
workers.MayFail(err)
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, subsystem := range subsystemOpeners {
|
|
opener := subsystem.opener
|
|
go opener()
|
|
}
|
|
|
|
if !workers.Wait() {
|
|
for _, err := range workers.Errors() {
|
|
log.Printf("Could not open storage due to %s", err)
|
|
}
|
|
|
|
return nil, fmt.Errorf("Unable to open metric persistence.")
|
|
}
|
|
|
|
return emission, nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
|
|
}(time.Now())
|
|
|
|
err = l.AppendSamples(clientmodel.Samples{sample})
|
|
|
|
return
|
|
}
|
|
|
|
// groupByFingerprint collects all of the provided samples, groups them
|
|
// together by their respective metric fingerprint, and finally sorts
|
|
// them chronologically.
|
|
func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint]clientmodel.Samples {
|
|
fingerprintToSamples := map[clientmodel.Fingerprint]clientmodel.Samples{}
|
|
|
|
for _, sample := range samples {
|
|
fingerprint := &clientmodel.Fingerprint{}
|
|
fingerprint.LoadFromMetric(sample.Metric)
|
|
samples := fingerprintToSamples[*fingerprint]
|
|
samples = append(samples, sample)
|
|
fingerprintToSamples[*fingerprint] = samples
|
|
}
|
|
|
|
sortingSemaphore := make(chan bool, sortConcurrency)
|
|
doneSorting := sync.WaitGroup{}
|
|
|
|
for _, samples := range fingerprintToSamples {
|
|
doneSorting.Add(1)
|
|
|
|
sortingSemaphore <- true
|
|
go func(samples clientmodel.Samples) {
|
|
sort.Sort(samples)
|
|
|
|
<-sortingSemaphore
|
|
doneSorting.Done()
|
|
}(samples)
|
|
}
|
|
|
|
doneSorting.Wait()
|
|
|
|
return fingerprintToSamples
|
|
}
|
|
|
|
// findUnindexedMetrics scours the metric membership index for each given Metric
|
|
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
|
|
// absent.
|
|
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed map[clientmodel.Fingerprint]clientmodel.Metric, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
|
|
}(time.Now())
|
|
|
|
unindexed = make(map[clientmodel.Fingerprint]clientmodel.Metric)
|
|
|
|
dto := &dto.Metric{}
|
|
for fingerprint, metric := range candidates {
|
|
dumpMetric(dto, metric)
|
|
indexHas, err := l.hasIndexMetric(dto)
|
|
if err != nil {
|
|
return unindexed, err
|
|
}
|
|
if !indexHas {
|
|
unindexed[fingerprint] = metric
|
|
}
|
|
}
|
|
|
|
return unindexed, nil
|
|
}
|
|
|
|
// indexLabelNames accumulates all label name to fingerprint index entries for
|
|
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
|
|
// the index to reflect the new state.
|
|
//
|
|
// This operation is idempotent.
|
|
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
|
|
}(time.Now())
|
|
|
|
labelNameFingerprints := map[clientmodel.LabelName]utility.Set{}
|
|
|
|
for fingerprint, metric := range metrics {
|
|
for labelName := range metric {
|
|
fingerprintSet, ok := labelNameFingerprints[labelName]
|
|
if !ok {
|
|
fingerprintSet = utility.Set{}
|
|
|
|
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, fingerprint := range fingerprints {
|
|
fingerprintSet.Add(*fingerprint)
|
|
}
|
|
}
|
|
|
|
fingerprintSet.Add(fingerprint)
|
|
labelNameFingerprints[labelName] = fingerprintSet
|
|
}
|
|
}
|
|
|
|
batch := leveldb.NewBatch()
|
|
defer batch.Close()
|
|
|
|
for labelName, fingerprintSet := range labelNameFingerprints {
|
|
fingerprints := clientmodel.Fingerprints{}
|
|
for e := range fingerprintSet {
|
|
fingerprint := e.(clientmodel.Fingerprint)
|
|
fingerprints = append(fingerprints, &fingerprint)
|
|
}
|
|
|
|
sort.Sort(fingerprints)
|
|
|
|
key := &dto.LabelName{
|
|
Name: proto.String(string(labelName)),
|
|
}
|
|
value := new(dto.FingerprintCollection)
|
|
for _, fingerprint := range fingerprints {
|
|
f := new(dto.Fingerprint)
|
|
dumpFingerprint(f, fingerprint)
|
|
value.Member = append(value.Member, f)
|
|
}
|
|
|
|
batch.Put(key, value)
|
|
}
|
|
|
|
err = l.labelNameToFingerprints.Commit(batch)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// indexLabelPairs accumulates all label pair to fingerprint index entries for
|
|
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
|
|
// the index to reflect the new state.
|
|
//
|
|
// This operation is idempotent.
|
|
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
|
|
}(time.Now())
|
|
|
|
labelPairFingerprints := map[LabelPair]utility.Set{}
|
|
|
|
for fingerprint, metric := range metrics {
|
|
for labelName, labelValue := range metric {
|
|
labelPair := LabelPair{
|
|
Name: labelName,
|
|
Value: labelValue,
|
|
}
|
|
fingerprintSet, ok := labelPairFingerprints[labelPair]
|
|
if !ok {
|
|
fingerprintSet = utility.Set{}
|
|
|
|
fingerprints, err := l.GetFingerprintsForLabelSet(clientmodel.LabelSet{
|
|
labelName: labelValue,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, fingerprint := range fingerprints {
|
|
fingerprintSet.Add(*fingerprint)
|
|
}
|
|
}
|
|
|
|
fingerprintSet.Add(fingerprint)
|
|
labelPairFingerprints[labelPair] = fingerprintSet
|
|
}
|
|
}
|
|
|
|
batch := leveldb.NewBatch()
|
|
defer batch.Close()
|
|
|
|
for labelPair, fingerprintSet := range labelPairFingerprints {
|
|
fingerprints := clientmodel.Fingerprints{}
|
|
for e := range fingerprintSet {
|
|
fingerprint := e.(clientmodel.Fingerprint)
|
|
fingerprints = append(fingerprints, &fingerprint)
|
|
}
|
|
|
|
sort.Sort(fingerprints)
|
|
|
|
key := &dto.LabelPair{
|
|
Name: proto.String(string(labelPair.Name)),
|
|
Value: proto.String(string(labelPair.Value)),
|
|
}
|
|
value := new(dto.FingerprintCollection)
|
|
for _, fingerprint := range fingerprints {
|
|
f := new(dto.Fingerprint)
|
|
dumpFingerprint(f, fingerprint)
|
|
value.Member = append(value.Member, f)
|
|
}
|
|
|
|
batch.Put(key, value)
|
|
}
|
|
|
|
err = l.labelSetToFingerprints.Commit(batch)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
|
|
// in the index and then bulk updates.
|
|
//
|
|
// This operation is idempotent.
|
|
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
|
|
}(time.Now())
|
|
|
|
batch := leveldb.NewBatch()
|
|
defer batch.Close()
|
|
|
|
for fingerprint, metric := range metrics {
|
|
f := new(dto.Fingerprint)
|
|
dumpFingerprint(f, &fingerprint)
|
|
m := &dto.Metric{}
|
|
dumpMetric(m, metric)
|
|
batch.Put(f, m)
|
|
}
|
|
|
|
err = l.fingerprintToMetrics.Commit(batch)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
var existenceIdentity = &dto.MembershipIndexValue{}
|
|
|
|
// indexMetrics takes groups of samples, determines which ones contain metrics
|
|
// that are unknown to the storage stack, and then proceeds to update all
|
|
// affected indices.
|
|
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
|
|
}(time.Now())
|
|
|
|
var (
|
|
absentMetrics map[clientmodel.Fingerprint]clientmodel.Metric
|
|
)
|
|
|
|
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if len(absentMetrics) == 0 {
|
|
return
|
|
}
|
|
|
|
// TODO: For the missing fingerprints, determine what label names and pairs
|
|
// are absent and act accordingly and append fingerprints.
|
|
workers := utility.NewUncertaintyGroup(3)
|
|
|
|
go func() {
|
|
workers.MayFail(l.indexLabelNames(absentMetrics))
|
|
}()
|
|
|
|
go func() {
|
|
workers.MayFail(l.indexLabelPairs(absentMetrics))
|
|
}()
|
|
|
|
go func() {
|
|
workers.MayFail(l.indexFingerprints(absentMetrics))
|
|
}()
|
|
|
|
if !workers.Wait() {
|
|
return fmt.Errorf("Could not index due to %s", workers.Errors())
|
|
}
|
|
|
|
// If any of the preceding operations failed, we will have inconsistent
|
|
// indices. Thusly, the Metric membership index should NOT be updated, as
|
|
// its state is used to determine whether to bulk update the other indices.
|
|
// Given that those operations are idempotent, it is OK to repeat them;
|
|
// however, it will consume considerable amounts of time.
|
|
batch := leveldb.NewBatch()
|
|
defer batch.Close()
|
|
|
|
for _, metric := range absentMetrics {
|
|
m := &dto.Metric{}
|
|
dumpMetric(m, metric)
|
|
batch.Put(m, existenceIdentity)
|
|
}
|
|
|
|
err = l.metricMembershipIndex.Commit(batch)
|
|
if err != nil {
|
|
// Not critical but undesirable.
|
|
log.Println(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
|
|
}(time.Now())
|
|
|
|
batch := leveldb.NewBatch()
|
|
defer batch.Close()
|
|
|
|
value := &dto.MetricHighWatermark{}
|
|
for fingerprint, samples := range groups {
|
|
value.Reset()
|
|
f := new(dto.Fingerprint)
|
|
dumpFingerprint(f, &fingerprint)
|
|
present, err := l.MetricHighWatermarks.Get(f, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newestSampleTimestamp := samples[len(samples)-1].Timestamp
|
|
|
|
if !present {
|
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
|
batch.Put(f, value)
|
|
|
|
continue
|
|
}
|
|
|
|
// BUG(matt): Repace this with watermark management.
|
|
if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) {
|
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
|
batch.Put(f, value)
|
|
}
|
|
}
|
|
|
|
err = l.MetricHighWatermarks.Commit(batch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
|
|
}(time.Now())
|
|
|
|
fingerprintToSamples := groupByFingerprint(samples)
|
|
indexErrChan := make(chan error, 1)
|
|
watermarkErrChan := make(chan error, 1)
|
|
|
|
go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) {
|
|
metrics := map[clientmodel.Fingerprint]clientmodel.Metric{}
|
|
|
|
for fingerprint, samples := range groups {
|
|
metrics[fingerprint] = samples[0].Metric
|
|
}
|
|
|
|
indexErrChan <- l.indexMetrics(metrics)
|
|
}(fingerprintToSamples)
|
|
|
|
go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) {
|
|
watermarkErrChan <- l.refreshHighWatermarks(groups)
|
|
}(fingerprintToSamples)
|
|
|
|
samplesBatch := leveldb.NewBatch()
|
|
defer samplesBatch.Close()
|
|
|
|
for fingerprint, group := range fingerprintToSamples {
|
|
for {
|
|
lengthOfGroup := len(group)
|
|
|
|
if lengthOfGroup == 0 {
|
|
break
|
|
}
|
|
|
|
take := *leveldbChunkSize
|
|
if lengthOfGroup < take {
|
|
take = lengthOfGroup
|
|
}
|
|
|
|
chunk := group[0:take]
|
|
group = group[take:lengthOfGroup]
|
|
|
|
key := SampleKey{
|
|
Fingerprint: &fingerprint,
|
|
FirstTimestamp: chunk[0].Timestamp,
|
|
LastTimestamp: chunk[take-1].Timestamp,
|
|
SampleCount: uint32(take),
|
|
}
|
|
|
|
value := &dto.SampleValueSeries{}
|
|
for _, sample := range chunk {
|
|
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
|
|
Timestamp: proto.Int64(sample.Timestamp.Unix()),
|
|
Value: proto.Float64(float64(sample.Value)),
|
|
})
|
|
}
|
|
|
|
k := &dto.SampleKey{}
|
|
key.Dump(k)
|
|
samplesBatch.Put(k, value)
|
|
}
|
|
}
|
|
|
|
err = l.MetricSamples.Commit(samplesBatch)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = <-indexErrChan
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = <-watermarkErrChan
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
|
|
k := &dto.SampleKey{}
|
|
err := proto.Unmarshal(i.Key(), k)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
key := &SampleKey{}
|
|
key.Load(k)
|
|
|
|
return key, nil
|
|
}
|
|
|
|
func extractSampleValues(i leveldb.Iterator) (Values, error) {
|
|
v := &dto.SampleValueSeries{}
|
|
err := proto.Unmarshal(i.Value(), v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewValuesFromDTO(v), nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
|
}(time.Now())
|
|
|
|
value, err = l.metricMembershipIndex.Has(dto)
|
|
|
|
return
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
|
|
}(time.Now())
|
|
|
|
value, err = l.labelSetToFingerprints.Has(dto)
|
|
|
|
return
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
|
|
}(time.Now())
|
|
|
|
value, err = l.labelNameToFingerprints.Has(dto)
|
|
|
|
return
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (fps clientmodel.Fingerprints, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
|
|
}(time.Now())
|
|
|
|
sets := []utility.Set{}
|
|
pair := &dto.LabelPair{}
|
|
unmarshaled := new(dto.FingerprintCollection)
|
|
|
|
for name, value := range labelSet {
|
|
pair.Reset()
|
|
unmarshaled.Reset()
|
|
|
|
pair.Name = proto.String(string(name))
|
|
pair.Value = proto.String(string(value))
|
|
|
|
present, err := l.labelSetToFingerprints.Get(pair, unmarshaled)
|
|
if err != nil {
|
|
return fps, err
|
|
}
|
|
if !present {
|
|
return nil, nil
|
|
}
|
|
|
|
set := utility.Set{}
|
|
|
|
for _, m := range unmarshaled.Member {
|
|
fp := &clientmodel.Fingerprint{}
|
|
loadFingerprint(fp, m)
|
|
set.Add(*fp)
|
|
}
|
|
|
|
sets = append(sets, set)
|
|
}
|
|
|
|
numberOfSets := len(sets)
|
|
if numberOfSets == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
base := sets[0]
|
|
for i := 1; i < numberOfSets; i++ {
|
|
base = base.Intersection(sets[i])
|
|
}
|
|
for _, e := range base.Elements() {
|
|
fingerprint := e.(clientmodel.Fingerprint)
|
|
fps = append(fps, &fingerprint)
|
|
}
|
|
|
|
return fps, nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientmodel.LabelName) (fps clientmodel.Fingerprints, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
|
|
}(time.Now())
|
|
|
|
unmarshaled := new(dto.FingerprintCollection)
|
|
d := &dto.LabelName{}
|
|
dumpLabelName(d, labelName)
|
|
present, err := l.labelNameToFingerprints.Get(d, unmarshaled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !present {
|
|
return nil, nil
|
|
}
|
|
|
|
for _, m := range unmarshaled.Member {
|
|
fp := &clientmodel.Fingerprint{}
|
|
loadFingerprint(fp, m)
|
|
fps = append(fps, fp)
|
|
}
|
|
|
|
return fps, nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
|
|
defer func(begin time.Time) {
|
|
duration := time.Since(begin)
|
|
|
|
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
|
}(time.Now())
|
|
|
|
unmarshaled := &dto.Metric{}
|
|
d := new(dto.Fingerprint)
|
|
dumpFingerprint(d, f)
|
|
present, err := l.fingerprintToMetrics.Get(d, unmarshaled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !present {
|
|
return nil, nil
|
|
}
|
|
|
|
m = clientmodel.Metric{}
|
|
|
|
for _, v := range unmarshaled.LabelPair {
|
|
m[clientmodel.LabelName(v.GetName())] = clientmodel.LabelValue(v.GetValue())
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t time.Time) Values {
|
|
panic("Not implemented")
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values {
|
|
panic("Not implemented")
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values {
|
|
panic("Not implemented")
|
|
}
|
|
|
|
type MetricKeyDecoder struct{}
|
|
|
|
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
|
unmarshaled := dto.LabelPair{}
|
|
err = proto.Unmarshal(in.([]byte), &unmarshaled)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
out = LabelPair{
|
|
Name: clientmodel.LabelName(*unmarshaled.Name),
|
|
Value: clientmodel.LabelValue(*unmarshaled.Value),
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
|
|
return
|
|
}
|
|
|
|
type LabelNameFilter struct {
|
|
labelName clientmodel.LabelName
|
|
}
|
|
|
|
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
|
|
labelPair, ok := key.(LabelPair)
|
|
if ok && labelPair.Name == f.labelName {
|
|
return storage.ACCEPT
|
|
}
|
|
return storage.SKIP
|
|
}
|
|
|
|
type CollectLabelValuesOp struct {
|
|
labelValues []clientmodel.LabelValue
|
|
}
|
|
|
|
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
|
|
labelPair := key.(LabelPair)
|
|
op.labelValues = append(op.labelValues, clientmodel.LabelValue(labelPair.Value))
|
|
return
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
|
|
filter := &LabelNameFilter{
|
|
labelName: labelName,
|
|
}
|
|
labelValuesOp := &CollectLabelValuesOp{}
|
|
|
|
_, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
values = labelValuesOp.labelValues
|
|
return
|
|
}
|
|
|
|
// CompactKeyspace compacts each database's keyspace serially.
|
|
//
|
|
// Beware that it would probably be imprudent to run this on a live user-facing
|
|
// server due to latency implications.
|
|
func (l *LevelDBMetricPersistence) CompactKeyspaces() {
|
|
l.CurationRemarks.CompactKeyspace()
|
|
l.fingerprintToMetrics.CompactKeyspace()
|
|
l.labelNameToFingerprints.CompactKeyspace()
|
|
l.labelSetToFingerprints.CompactKeyspace()
|
|
l.MetricHighWatermarks.CompactKeyspace()
|
|
l.metricMembershipIndex.CompactKeyspace()
|
|
l.MetricSamples.CompactKeyspace()
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) {
|
|
size := uint64(0)
|
|
|
|
if size, err = l.CurationRemarks.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.MetricHighWatermarks.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
if size, err = l.MetricSamples.ApproximateSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
total += size
|
|
|
|
return total, nil
|
|
}
|
|
|
|
func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState {
|
|
states := []leveldb.DatabaseState{}
|
|
|
|
state := l.CurationRemarks.State()
|
|
state.Name = "Curation Remarks"
|
|
state.Type = "Watermark"
|
|
states = append(states, state)
|
|
|
|
state = l.fingerprintToMetrics.State()
|
|
state.Name = "Fingerprints to Metrics"
|
|
state.Type = "Index"
|
|
states = append(states, state)
|
|
|
|
state = l.labelNameToFingerprints.State()
|
|
state.Name = "Label Name to Fingerprints"
|
|
state.Type = "Inverted Index"
|
|
states = append(states, state)
|
|
|
|
state = l.labelSetToFingerprints.State()
|
|
state.Name = "Label Pair to Fingerprints"
|
|
state.Type = "Inverted Index"
|
|
states = append(states, state)
|
|
|
|
state = l.MetricHighWatermarks.State()
|
|
state.Name = "Metric Last Write"
|
|
state.Type = "Watermark"
|
|
states = append(states, state)
|
|
|
|
state = l.metricMembershipIndex.State()
|
|
state.Name = "Metric Membership"
|
|
state.Type = "Index"
|
|
states = append(states, state)
|
|
|
|
state = l.MetricSamples.State()
|
|
state.Name = "Samples"
|
|
state.Type = "Time Series"
|
|
states = append(states, state)
|
|
|
|
return states
|
|
}
|