mirror of https://github.com/prometheus/prometheus
Extract Label Pair to Fingerprint indexing.
parent
5959cd9e53
commit
532589f728
|
@ -43,6 +43,8 @@ const (
|
|||
hasIndexMetric = "has_index_metric"
|
||||
hasLabelName = "has_label_name"
|
||||
hasLabelPair = "has_label_pair"
|
||||
indexLabelNames = "index_label_names"
|
||||
indexLabelPairs = "index_label_pairs"
|
||||
indexMetric = "index_metric"
|
||||
indexMetrics = "index_metrics"
|
||||
rebuildDiskFrontier = "rebuild_disk_frontier"
|
||||
|
|
|
@ -269,6 +269,145 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin
|
|||
return
|
||||
}
|
||||
|
||||
// indexLabelNames accumulates all label name to fingerprint index entries for
|
||||
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
|
||||
// the index to reflect the new state.
|
||||
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
|
||||
}()
|
||||
|
||||
labelNameFingerprints := map[model.LabelName]utility.Set{}
|
||||
|
||||
for fingerprint, metric := range metrics {
|
||||
for labelName := range metric {
|
||||
fingerprintSet, ok := labelNameFingerprints[labelName]
|
||||
if !ok {
|
||||
fingerprintSet = utility.Set{}
|
||||
|
||||
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintSet.Add(fingerprint)
|
||||
labelNameFingerprints[labelName] = fingerprintSet
|
||||
}
|
||||
}
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for labelName, fingerprintSet := range labelNameFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
|
||||
key := &dto.LabelName{
|
||||
Name: proto.String(string(labelName)),
|
||||
}
|
||||
value := &dto.FingerprintCollection{}
|
||||
for _, fingerprint := range fingerprints {
|
||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
||||
}
|
||||
|
||||
err = l.labelNameToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// indexLabelPairs accumulates all label pair to fingerprint index entries for
|
||||
// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates
|
||||
// the index to reflect the new state.
|
||||
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
|
||||
}()
|
||||
|
||||
labelPairFingerprints := map[model.LabelPair]utility.Set{}
|
||||
|
||||
for fingerprint, metric := range metrics {
|
||||
for labelName, labelValue := range metric {
|
||||
labelPair := model.LabelPair{
|
||||
Name: labelName,
|
||||
Value: labelValue,
|
||||
}
|
||||
fingerprintSet, ok := labelPairFingerprints[labelPair]
|
||||
if !ok {
|
||||
fingerprintSet = utility.Set{}
|
||||
|
||||
fingerprints, err := l.GetFingerprintsForLabelSet(model.LabelSet{
|
||||
labelName: labelValue,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintSet.Add(fingerprint)
|
||||
labelPairFingerprints[labelPair] = fingerprintSet
|
||||
}
|
||||
}
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for labelPair, fingerprintSet := range labelPairFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
|
||||
key := &dto.LabelPair{
|
||||
Name: proto.String(string(labelPair.Name)),
|
||||
Value: proto.String(string(labelPair.Value)),
|
||||
}
|
||||
value := &dto.FingerprintCollection{}
|
||||
for _, fingerprint := range fingerprints {
|
||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
||||
}
|
||||
|
||||
err = l.labelSetToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// indexMetrics takes groups of samples, determines which ones contain metrics
|
||||
// that are unknown to the storage stack, and then proceeds to update all
|
||||
// affected indices.
|
||||
|
@ -289,149 +428,35 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
panic(err)
|
||||
}
|
||||
|
||||
if len(absentMetrics) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: For the missing fingerprints, determine what label names and pairs
|
||||
// are absent and act accordingly and append fingerprints.
|
||||
var (
|
||||
doneBuildingLabelNameIndex = make(chan interface{})
|
||||
doneBuildingLabelPairIndex = make(chan interface{})
|
||||
doneBuildingLabelNameIndex = make(chan error)
|
||||
doneBuildingLabelPairIndex = make(chan error)
|
||||
)
|
||||
|
||||
// Update LabelName -> Fingerprint index.
|
||||
go func() {
|
||||
labelNameFingerprints := map[model.LabelName]utility.Set{}
|
||||
|
||||
for fingerprint, metric := range absentMetrics {
|
||||
for labelName := range metric {
|
||||
fingerprintSet, ok := labelNameFingerprints[labelName]
|
||||
if !ok {
|
||||
fingerprintSet = utility.Set{}
|
||||
|
||||
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
doneBuildingLabelNameIndex <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintSet.Add(fingerprint)
|
||||
labelNameFingerprints[labelName] = fingerprintSet
|
||||
}
|
||||
}
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for labelName, fingerprintSet := range labelNameFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
|
||||
key := &dto.LabelName{
|
||||
Name: proto.String(string(labelName)),
|
||||
}
|
||||
value := &dto.FingerprintCollection{}
|
||||
for _, fingerprint := range fingerprints {
|
||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
||||
}
|
||||
|
||||
err := l.labelNameToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
doneBuildingLabelNameIndex <- err
|
||||
return
|
||||
}
|
||||
|
||||
doneBuildingLabelNameIndex <- true
|
||||
doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics)
|
||||
}()
|
||||
|
||||
// Update LabelPair -> Fingerprint index.
|
||||
go func() {
|
||||
labelPairFingerprints := map[model.LabelPair]utility.Set{}
|
||||
|
||||
for fingerprint, metric := range absentMetrics {
|
||||
for labelName, labelValue := range metric {
|
||||
labelPair := model.LabelPair{
|
||||
Name: labelName,
|
||||
Value: labelValue,
|
||||
}
|
||||
fingerprintSet, ok := labelPairFingerprints[labelPair]
|
||||
if !ok {
|
||||
fingerprintSet = utility.Set{}
|
||||
|
||||
fingerprints, err := l.GetFingerprintsForLabelSet(model.LabelSet{
|
||||
labelName: labelValue,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
doneBuildingLabelPairIndex <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintSet.Add(fingerprint)
|
||||
labelPairFingerprints[labelPair] = fingerprintSet
|
||||
}
|
||||
}
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for labelPair, fingerprintSet := range labelPairFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
|
||||
key := &dto.LabelPair{
|
||||
Name: proto.String(string(labelPair.Name)),
|
||||
Value: proto.String(string(labelPair.Value)),
|
||||
}
|
||||
value := &dto.FingerprintCollection{}
|
||||
for _, fingerprint := range fingerprints {
|
||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
||||
}
|
||||
|
||||
err := l.labelSetToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
doneBuildingLabelPairIndex <- true
|
||||
return
|
||||
}
|
||||
|
||||
doneBuildingLabelPairIndex <- true
|
||||
doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics)
|
||||
}()
|
||||
|
||||
makeTopLevelIndex := true
|
||||
|
||||
v := <-doneBuildingLabelNameIndex
|
||||
_, ok := v.(error)
|
||||
if ok {
|
||||
err = <-doneBuildingLabelNameIndex
|
||||
if err != nil {
|
||||
panic(err)
|
||||
makeTopLevelIndex = false
|
||||
}
|
||||
v = <-doneBuildingLabelPairIndex
|
||||
_, ok = v.(error)
|
||||
if ok {
|
||||
err = <-doneBuildingLabelPairIndex
|
||||
if err != nil {
|
||||
panic(err)
|
||||
makeTopLevelIndex = false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue