Browse Source

Extract index storage into separate types.

pull/346/head
Matt T. Proud 11 years ago
parent
commit
f4669a812c
  1. 3
      Makefile
  2. 2544
      rules/lexer.l.go
  3. 221
      rules/parser.y.go
  4. 330
      storage/metric/index.go
  5. 397
      storage/metric/leveldb.go
  6. 23
      storage/raw/interface.go

3
Makefile

@ -83,8 +83,7 @@ source_path:
[ -d "$(FULL_GOPATH)" ]
test: build
$(GOENV) find . -maxdepth 1 -mindepth 1 -type d -and -not -path $(BUILD_PATH) -exec $(GOCC) test {}/... $(GO_TEST_FLAGS) \;
$(GO) test $(GO_TEST_FLAGS)
$(GO) test $(GO_TEST_FLAGS) ./...
tools: dependencies preparation
$(MAKE) -C tools

2544
rules/lexer.l.go

File diff suppressed because it is too large Load Diff

221
rules/parser.y.go

@ -1,25 +1,25 @@
//line parser.y:15
package rules
package rules
import __yyfmt__ "fmt"
//line parser.y:15
import (
clientmodel "github.com/prometheus/client_golang/model"
import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/rules/ast"
)
"github.com/prometheus/prometheus/rules/ast"
)
//line parser.y:24
type yySymType struct {
yys int
num clientmodel.SampleValue
str string
ruleNode ast.Node
ruleNodeSlice []ast.Node
boolean bool
labelNameSlice clientmodel.LabelNames
labelSet clientmodel.LabelSet
yys int
num clientmodel.SampleValue
str string
ruleNode ast.Node
ruleNodeSlice []ast.Node
boolean bool
labelNameSlice clientmodel.LabelNames
labelSet clientmodel.LabelSet
}
const START_RULES = 57346
@ -70,7 +70,6 @@ const yyMaxDepth = 200
//line parser.y:191
//line yacctab:1
var yyExca = []int{
-1, 1,
@ -407,133 +406,207 @@ yydefault:
case 5:
//line parser.y:69
{ yylex.(*RulesLexer).parsedExpr = yyS[yypt-0].ruleNode }
{
yylex.(*RulesLexer).parsedExpr = yyS[yypt-0].ruleNode
}
case 6:
//line parser.y:73
{
rule, err := CreateRecordingRule(yyS[yypt-3].str, yyS[yypt-2].labelSet, yyS[yypt-0].ruleNode, yyS[yypt-4].boolean)
if err != nil { yylex.Error(err.Error()); return 1 }
yylex.(*RulesLexer).parsedRules = append(yylex.(*RulesLexer).parsedRules, rule)
}
rule, err := CreateRecordingRule(yyS[yypt-3].str, yyS[yypt-2].labelSet, yyS[yypt-0].ruleNode, yyS[yypt-4].boolean)
if err != nil {
yylex.Error(err.Error())
return 1
}
yylex.(*RulesLexer).parsedRules = append(yylex.(*RulesLexer).parsedRules, rule)
}
case 7:
//line parser.y:79
{
rule, err := CreateAlertingRule(yyS[yypt-9].str, yyS[yypt-7].ruleNode, yyS[yypt-6].str, yyS[yypt-4].labelSet, yyS[yypt-2].str, yyS[yypt-0].str)
if err != nil { yylex.Error(err.Error()); return 1 }
yylex.(*RulesLexer).parsedRules = append(yylex.(*RulesLexer).parsedRules, rule)
}
rule, err := CreateAlertingRule(yyS[yypt-9].str, yyS[yypt-7].ruleNode, yyS[yypt-6].str, yyS[yypt-4].labelSet, yyS[yypt-2].str, yyS[yypt-0].str)
if err != nil {
yylex.Error(err.Error())
return 1
}
yylex.(*RulesLexer).parsedRules = append(yylex.(*RulesLexer).parsedRules, rule)
}
case 8:
//line parser.y:87
{ yyVAL.str = "0s" }
{
yyVAL.str = "0s"
}
case 9:
//line parser.y:89
{ yyVAL.str = yyS[yypt-0].str }
{
yyVAL.str = yyS[yypt-0].str
}
case 10:
//line parser.y:93
{ yyVAL.boolean = false }
{
yyVAL.boolean = false
}
case 11:
//line parser.y:95
{ yyVAL.boolean = true }
{
yyVAL.boolean = true
}
case 12:
//line parser.y:99
{ yyVAL.labelSet = clientmodel.LabelSet{} }
{
yyVAL.labelSet = clientmodel.LabelSet{}
}
case 13:
//line parser.y:101
{ yyVAL.labelSet = yyS[yypt-1].labelSet }
{
yyVAL.labelSet = yyS[yypt-1].labelSet
}
case 14:
//line parser.y:103
{ yyVAL.labelSet = clientmodel.LabelSet{} }
{
yyVAL.labelSet = clientmodel.LabelSet{}
}
case 15:
//line parser.y:106
{ yyVAL.labelSet = yyS[yypt-0].labelSet }
{
yyVAL.labelSet = yyS[yypt-0].labelSet
}
case 16:
//line parser.y:108
{ for k, v := range yyS[yypt-0].labelSet { yyVAL.labelSet[k] = v } }
{
for k, v := range yyS[yypt-0].labelSet {
yyVAL.labelSet[k] = v
}
}
case 17:
//line parser.y:112
{ yyVAL.labelSet = clientmodel.LabelSet{ clientmodel.LabelName(yyS[yypt-2].str): clientmodel.LabelValue(yyS[yypt-0].str) } }
{
yyVAL.labelSet = clientmodel.LabelSet{clientmodel.LabelName(yyS[yypt-2].str): clientmodel.LabelValue(yyS[yypt-0].str)}
}
case 18:
//line parser.y:117
{ yyVAL.ruleNode = yyS[yypt-1].ruleNode }
{
yyVAL.ruleNode = yyS[yypt-1].ruleNode
}
case 19:
//line parser.y:119
{ yyS[yypt-0].labelSet[clientmodel.MetricNameLabel] = clientmodel.LabelValue(yyS[yypt-1].str); yyVAL.ruleNode = ast.NewVectorLiteral(yyS[yypt-0].labelSet) }
{
yyS[yypt-0].labelSet[clientmodel.MetricNameLabel] = clientmodel.LabelValue(yyS[yypt-1].str)
yyVAL.ruleNode = ast.NewVectorLiteral(yyS[yypt-0].labelSet)
}
case 20:
//line parser.y:121
{
var err error
yyVAL.ruleNode, err = NewFunctionCall(yyS[yypt-3].str, yyS[yypt-1].ruleNodeSlice)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewFunctionCall(yyS[yypt-3].str, yyS[yypt-1].ruleNodeSlice)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 21:
//line parser.y:127
{
var err error
yyVAL.ruleNode, err = NewFunctionCall(yyS[yypt-2].str, []ast.Node{})
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewFunctionCall(yyS[yypt-2].str, []ast.Node{})
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 22:
//line parser.y:133
{
var err error
yyVAL.ruleNode, err = NewMatrix(yyS[yypt-3].ruleNode, yyS[yypt-1].str)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewMatrix(yyS[yypt-3].ruleNode, yyS[yypt-1].str)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 23:
//line parser.y:139
{
var err error
yyVAL.ruleNode, err = NewVectorAggregation(yyS[yypt-4].str, yyS[yypt-2].ruleNode, yyS[yypt-0].labelNameSlice)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewVectorAggregation(yyS[yypt-4].str, yyS[yypt-2].ruleNode, yyS[yypt-0].labelNameSlice)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 24:
//line parser.y:147
{
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 25:
//line parser.y:153
{
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 26:
//line parser.y:159
{
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil { yylex.Error(err.Error()); return 1 }
}
var err error
yyVAL.ruleNode, err = NewArithExpr(yyS[yypt-1].str, yyS[yypt-2].ruleNode, yyS[yypt-0].ruleNode)
if err != nil {
yylex.Error(err.Error())
return 1
}
}
case 27:
//line parser.y:165
{ yyVAL.ruleNode = ast.NewScalarLiteral(yyS[yypt-0].num)}
{
yyVAL.ruleNode = ast.NewScalarLiteral(yyS[yypt-0].num)
}
case 28:
//line parser.y:169
{ yyVAL.labelNameSlice = clientmodel.LabelNames{} }
{
yyVAL.labelNameSlice = clientmodel.LabelNames{}
}
case 29:
//line parser.y:171
{ yyVAL.labelNameSlice = yyS[yypt-1].labelNameSlice }
{
yyVAL.labelNameSlice = yyS[yypt-1].labelNameSlice
}
case 30:
//line parser.y:175
{ yyVAL.labelNameSlice = clientmodel.LabelNames{clientmodel.LabelName(yyS[yypt-0].str)} }
{
yyVAL.labelNameSlice = clientmodel.LabelNames{clientmodel.LabelName(yyS[yypt-0].str)}
}
case 31:
//line parser.y:177
{ yyVAL.labelNameSlice = append(yyVAL.labelNameSlice, clientmodel.LabelName(yyS[yypt-0].str)) }
{
yyVAL.labelNameSlice = append(yyVAL.labelNameSlice, clientmodel.LabelName(yyS[yypt-0].str))
}
case 32:
//line parser.y:181
{ yyVAL.ruleNodeSlice = []ast.Node{yyS[yypt-0].ruleNode} }
{
yyVAL.ruleNodeSlice = []ast.Node{yyS[yypt-0].ruleNode}
}
case 33:
//line parser.y:183
{ yyVAL.ruleNodeSlice = append(yyVAL.ruleNodeSlice, yyS[yypt-0].ruleNode) }
{
yyVAL.ruleNodeSlice = append(yyVAL.ruleNodeSlice, yyS[yypt-0].ruleNode)
}
case 34:
//line parser.y:187
{ yyVAL.ruleNode = yyS[yypt-0].ruleNode }
{
yyVAL.ruleNode = yyS[yypt-0].ruleNode
}
case 35:
//line parser.y:189
{ yyVAL.ruleNode = ast.NewStringLiteral(yyS[yypt-0].str) }
{
yyVAL.ruleNode = ast.NewStringLiteral(yyS[yypt-0].str)
}
}
goto yystack /* stack new state and value */
}

330
storage/metric/index.go

@ -0,0 +1,330 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sort"
"code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
type FingerprintMetricIndex interface {
IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
Close() error
}
type leveldbFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence
}
type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *leveldbFingerprintMetricIndex) Close() error {
i.p.Close()
return nil
}
func (i *leveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch()
defer b.Close()
for f, m := range mapping {
k := new(dto.Fingerprint)
dumpFingerprint(k, &f)
v := new(dto.Metric)
dumpMetric(v, m)
b.Put(k, v)
}
return i.p.Commit(b)
}
func (i *leveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.Metric)
if ok, err := i.p.Get(k, v); !ok {
return nil, false, nil
} else if err != nil {
return nil, false, err
}
m = clientmodel.Metric{}
for _, pair := range v.LabelPair {
m[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue())
}
return m, true, nil
}
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &leveldbFingerprintMetricIndex{
p: s,
}, nil
}
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
type LabelNameFingerprintIndex interface {
IndexBatch(LabelNameFingerprintMapping) error
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error)
Close() error
}
type leveldbLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
func (i *leveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for labelName, fingerprints := range b {
sort.Sort(fingerprints)
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
return i.p.Commit(batch)
}
func (i *leveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName)
dumpLabelName(k, l)
v := new(dto.FingerprintCollection)
ok, err = i.p.Get(k, v)
if err != nil {
return nil, false, err
}
if !ok {
return nil, false, nil
}
for _, m := range v.Member {
fp := new(clientmodel.Fingerprint)
loadFingerprint(fp, m)
fps = append(fps, fp)
}
return fps, true, nil
}
func (i *leveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{
Name: proto.String(string(l)),
})
}
func (i *leveldbLabelNameFingerprintIndex) Close() error {
i.p.Close()
return nil
}
type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &leveldbLabelNameFingerprintIndex{
p: s,
}, nil
}
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface {
raw.ForEacher
IndexBatch(LabelSetFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
Close() error
}
type leveldbLabelSetFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *leveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for pair, fps := range m {
sort.Sort(fps)
key := &dto.LabelPair{
Name: proto.String(string(pair.Name)),
Value: proto.String(string(pair.Value)),
}
value := new(dto.FingerprintCollection)
for _, fp := range fps {
f := new(dto.Fingerprint)
dumpFingerprint(f, fp)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
return i.p.Commit(batch)
}
func (i *leveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
v := new(dto.FingerprintCollection)
ok, err = i.p.Get(k, v)
if !ok {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
for _, pair := range v.Member {
fp := new(clientmodel.Fingerprint)
loadFingerprint(fp, pair)
m = append(m, fp)
}
return m, true, nil
}
func (i *leveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
return i.p.Has(k)
}
func (i *leveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *leveldbLabelSetFingerprintIndex) Close() error {
i.p.Close()
return nil
}
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &leveldbLabelSetFingerprintIndex{
p: s,
}, nil
}
type MetricMembershipIndex interface {
IndexBatch([]clientmodel.Metric) error
Has(clientmodel.Metric) (ok bool, err error)
Close() error
}
type leveldbMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence
}
var existenceIdentity = new(dto.MembershipIndexValue)
func (i *leveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
batch := leveldb.NewBatch()
defer batch.Close()
for _, m := range ms {
k := new(dto.Metric)
dumpMetric(k, m)
batch.Put(k, existenceIdentity)
}
return i.p.Commit(batch)
}
func (i *leveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric)
dumpMetric(k, m)
return i.p.Has(k)
}
func (i *leveldbMetricMembershipIndex) Close() error {
i.p.Close()
return nil
}
type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &leveldbMetricMembershipIndex{
p: s,
}, nil
}

397
storage/metric/leveldb.go

@ -26,7 +26,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
@ -37,11 +36,11 @@ const sortConcurrency = 2
type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence
fingerprintToMetrics *leveldb.LevelDBPersistence
labelNameToFingerprints *leveldb.LevelDBPersistence
labelSetToFingerprints *leveldb.LevelDBPersistence
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
MetricHighWatermarks *leveldb.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
metricMembershipIndex MetricMembershipIndex
MetricSamples *leveldb.LevelDBPersistence
}
@ -60,12 +59,15 @@ var (
)
type leveldbOpener func()
type leveldbCloser interface {
type errorCloser interface {
Close() error
}
type closer interface {
Close()
}
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
var persistences = []interface{}{
l.CurationRemarks,
l.fingerprintToMetrics,
l.labelNameToFingerprints,
@ -77,14 +79,21 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup := sync.WaitGroup{}
for _, closer := range persistences {
for _, c := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
if closer != nil {
closer.Close()
go func(c interface{}) {
if c != nil {
switch closer := c.(type) {
case closer:
closer.Close()
case errorCloser:
if err := closer.Close(); err != nil {
log.Println("anomaly closing", err)
}
}
}
closerGroup.Done()
}(closer)
}(c)
}
closerGroup.Wait()
@ -103,11 +112,12 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Label Names and Value Pairs by Fingerprint",
func() {
var err error
o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
}
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(o)
emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
},
})
workers.MayFail(err)
},
},
@ -139,11 +149,12 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name",
func() {
var err error
o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name",
CacheSizeBytes: *labelNameToFingerprintsCacheSize,
}
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(o)
emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name",
CacheSizeBytes: *labelNameToFingerprintsCacheSize,
},
})
workers.MayFail(err)
},
},
@ -151,11 +162,12 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair",
func() {
var err error
o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair",
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
}
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(o)
emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair",
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
},
})
workers.MayFail(err)
},
},
@ -163,13 +175,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Metric Membership Index",
func() {
var err error
o := &index.LevelDBIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/metric_membership_index",
CacheSizeBytes: *metricMembershipIndexCacheSize,
},
}
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(o)
emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex(
&LevelDBMetricMembershipIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: baseDirectory + "/metric_membership_index",
CacheSizeBytes: *metricMembershipIndexCacheSize,
},
})
workers.MayFail(err)
},
},
@ -252,19 +264,16 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint
// findUnindexedMetrics scours the metric membership index for each given Metric
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed map[clientmodel.Fingerprint]clientmodel.Metric, err error) {
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed FingerprintMetricMapping, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
}(time.Now())
unindexed = make(map[clientmodel.Fingerprint]clientmodel.Metric)
dto := &dto.Metric{}
unindexed = FingerprintMetricMapping{}
for fingerprint, metric := range candidates {
dumpMetric(dto, metric)
indexHas, err := l.hasIndexMetric(dto)
indexHas, err := l.hasIndexMetric(metric)
if err != nil {
return unindexed, err
}
@ -281,67 +290,47 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmod
// the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexLabelNames(metrics FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
}(time.Now())
labelNameFingerprints := map[clientmodel.LabelName]utility.Set{}
retrieved := map[clientmodel.LabelName]utility.Set{}
for fingerprint, metric := range metrics {
for labelName := range metric {
fingerprintSet, ok := labelNameFingerprints[labelName]
fingerprintSet, ok := retrieved[labelName]
if !ok {
fingerprintSet = utility.Set{}
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
retrieved[labelName] = fingerprintSet
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
}
fingerprintSet.Add(fingerprint)
labelNameFingerprints[labelName] = fingerprintSet
}
}
batch := leveldb.NewBatch()
defer batch.Close()
for labelName, fingerprintSet := range labelNameFingerprints {
fingerprints := clientmodel.Fingerprints{}
for e := range fingerprintSet {
fingerprint := e.(clientmodel.Fingerprint)
fingerprints = append(fingerprints, &fingerprint)
pending := LabelNameFingerprintMapping{}
for name, set := range retrieved {
fps := pending[name]
for fp := range set {
f := fp.(clientmodel.Fingerprint)
fps = append(fps, &f)
}
sort.Sort(fingerprints)
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
err = l.labelNameToFingerprints.Commit(batch)
if err != nil {
return
pending[name] = fps
}
return
return l.labelNameToFingerprints.IndexBatch(pending)
}
// indexLabelPairs accumulates all label pair to fingerprint index entries for
@ -356,7 +345,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
}(time.Now())
labelPairFingerprints := map[LabelPair]utility.Set{}
collection := map[LabelPair]utility.Set{}
for fingerprint, metric := range metrics {
for labelName, labelValue := range metric {
@ -364,113 +353,69 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge
Name: labelName,
Value: labelValue,
}
fingerprintSet, ok := labelPairFingerprints[labelPair]
fingerprintSet, ok := collection[labelPair]
if !ok {
fingerprintSet = utility.Set{}
fingerprints, err := l.GetFingerprintsForLabelSet(clientmodel.LabelSet{
labelName: labelValue,
})
fingerprints, _, err := l.labelSetToFingerprints.Lookup(&labelPair)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
collection[labelPair] = fingerprintSet
}
fingerprintSet.Add(fingerprint)
labelPairFingerprints[labelPair] = fingerprintSet
}
}
batch := leveldb.NewBatch()
defer batch.Close()
for labelPair, fingerprintSet := range labelPairFingerprints {
fingerprints := clientmodel.Fingerprints{}
for e := range fingerprintSet {
fingerprint := e.(clientmodel.Fingerprint)
fingerprints = append(fingerprints, &fingerprint)
}
sort.Sort(fingerprints)
batch := LabelSetFingerprintMapping{}
key := &dto.LabelPair{
Name: proto.String(string(labelPair.Name)),
Value: proto.String(string(labelPair.Value)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
for pair, elements := range collection {
fps := batch[pair]
for element := range elements {
fp := element.(clientmodel.Fingerprint)
fps = append(fps, &fp)
}
batch.Put(key, value)
batch[pair] = fps
}
err = l.labelSetToFingerprints.Commit(batch)
if err != nil {
return
}
return
return l.labelSetToFingerprints.IndexBatch(batch)
}
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexFingerprints(b FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
for fingerprint, metric := range metrics {
f := new(dto.Fingerprint)
dumpFingerprint(f, &fingerprint)
m := &dto.Metric{}
dumpMetric(m, metric)
batch.Put(f, m)
}
err = l.fingerprintToMetrics.Commit(batch)
if err != nil {
return
}
return
return l.fingerprintToMetrics.IndexBatch(b)
}
var existenceIdentity = &dto.MembershipIndexValue{}
// indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
}(time.Now())
var (
absentMetrics map[clientmodel.Fingerprint]clientmodel.Metric
)
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
absentees, err := l.findUnindexedMetrics(fingerprints)
if err != nil {
return
}
if len(absentMetrics) == 0 {
if len(absentees) == 0 {
return
}
@ -479,42 +424,32 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fin
workers := utility.NewUncertaintyGroup(3)
go func() {
workers.MayFail(l.indexLabelNames(absentMetrics))
workers.MayFail(l.indexLabelNames(absentees))
}()
go func() {
workers.MayFail(l.indexLabelPairs(absentMetrics))
workers.MayFail(l.indexLabelPairs(absentees))
}()
go func() {
workers.MayFail(l.indexFingerprints(absentMetrics))
workers.MayFail(l.indexFingerprints(absentees))
}()
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
batch := leveldb.NewBatch()
defer batch.Close()
for _, metric := range absentMetrics {
m := &dto.Metric{}
dumpMetric(m, metric)
batch.Put(m, existenceIdentity)
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
err = l.metricMembershipIndex.Commit(batch)
if err != nil {
// Not critical but undesirable.
log.Println(err)
ms := []clientmodel.Metric{}
for _, m := range absentees {
ms = append(ms, m)
}
return
return l.metricMembershipIndex.IndexBatch(ms)
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
@ -573,7 +508,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
watermarkErrChan := make(chan error, 1)
go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) {
metrics := map[clientmodel.Fingerprint]clientmodel.Metric{}
metrics := FingerprintMetricMapping{}
for fingerprint, samples := range groups {
metrics[fingerprint] = samples[0].Metric
@ -667,38 +602,34 @@ func extractSampleValues(i leveldb.Iterator) (Values, error) {
return NewValuesFromDTO(v), nil
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}(time.Now())
value, err = l.metricMembershipIndex.Has(dto)
return
return l.metricMembershipIndex.Has(m)
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now())
value, err = l.labelSetToFingerprints.Has(dto)
return
return l.labelSetToFingerprints.Has(p)
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}(time.Now())
value, err = l.labelNameToFingerprints.Has(dto)
value, err = l.labelNameToFingerprints.Has(n)
return
}
@ -711,29 +642,19 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
}(time.Now())
sets := []utility.Set{}
pair := &dto.LabelPair{}
unmarshaled := new(dto.FingerprintCollection)
for name, value := range labelSet {
pair.Reset()
unmarshaled.Reset()
pair.Name = proto.String(string(name))
pair.Value = proto.String(string(value))
present, err := l.labelSetToFingerprints.Get(pair, unmarshaled)
fps, _, err := l.labelSetToFingerprints.Lookup(&LabelPair{
Name: name,
Value: value,
})
if err != nil {
return fps, err
}
if !present {
return nil, nil
return nil, err
}
set := utility.Set{}
for _, m := range unmarshaled.Member {
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m)
for _, fp := range fps {
set.Add(*fp)
}
@ -764,24 +685,10 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}(time.Now())
unmarshaled := new(dto.FingerprintCollection)
d := &dto.LabelName{}
dumpLabelName(d, labelName)
present, err := l.labelNameToFingerprints.Get(d, unmarshaled)
if err != nil {
return nil, err
}
if !present {
return nil, nil
}
for _, m := range unmarshaled.Member {
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m)
fps = append(fps, fp)
}
// TODO(matt): Update signature to work with ok.
fps, _, err = l.labelNameToFingerprints.Lookup(labelName)
return fps, nil
return fps, err
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
@ -791,22 +698,8 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}(time.Now())
unmarshaled := &dto.Metric{}
d := new(dto.Fingerprint)
dumpFingerprint(d, f)
present, err := l.fingerprintToMetrics.Get(d, unmarshaled)
if err != nil {
return nil, err
}
if !present {
return nil, nil
}
m = clientmodel.Metric{}
for _, v := range unmarshaled.LabelPair {
m[clientmodel.LabelName(v.GetName())] = clientmodel.LabelValue(v.GetValue())
}
// TODO(matt): Update signature to work with ok.
m, _, err = l.fingerprintToMetrics.Lookup(f)
return m, nil
}
@ -887,11 +780,11 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
// server due to latency implications.
func (l *LevelDBMetricPersistence) CompactKeyspaces() {
l.CurationRemarks.CompactKeyspace()
l.fingerprintToMetrics.CompactKeyspace()
l.labelNameToFingerprints.CompactKeyspace()
l.labelSetToFingerprints.CompactKeyspace()
// l.fingerprintToMetrics.CompactKeyspace()
// l.labelNameToFingerprints.CompactKeyspace()
// l.labelSetToFingerprints.CompactKeyspace()
l.MetricHighWatermarks.CompactKeyspace()
l.metricMembershipIndex.CompactKeyspace()
// l.metricMembershipIndex.CompactKeyspace()
l.MetricSamples.CompactKeyspace()
}
@ -903,30 +796,30 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
}
total += size
if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil {
return 0, err
}
total += size
// if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil {
return 0, err
}
total += size
// if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil {
return 0, err
}
total += size
// if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, err = l.MetricHighWatermarks.ApproximateSize(); err != nil {
return 0, err
}
total += size
if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil {
return 0, err
}
total += size
// if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, err = l.MetricSamples.ApproximateSize(); err != nil {
return 0, err
@ -944,30 +837,30 @@ func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState {
state.Type = "Watermark"
states = append(states, state)
state = l.fingerprintToMetrics.State()
state.Name = "Fingerprints to Metrics"
state.Type = "Index"
states = append(states, state)
// state = l.fingerprintToMetrics.State()
// state.Name = "Fingerprints to Metrics"
// state.Type = "Index"
// states = append(states, state)
state = l.labelNameToFingerprints.State()
state.Name = "Label Name to Fingerprints"
state.Type = "Inverted Index"
states = append(states, state)
// state = l.labelNameToFingerprints.State()
// state.Name = "Label Name to Fingerprints"
// state.Type = "Inverted Index"
// states = append(states, state)
state = l.labelSetToFingerprints.State()
state.Name = "Label Pair to Fingerprints"
state.Type = "Inverted Index"
states = append(states, state)
// state = l.labelSetToFingerprints.State()
// state.Name = "Label Pair to Fingerprints"
// state.Type = "Inverted Index"
// states = append(states, state)
state = l.MetricHighWatermarks.State()
state.Name = "Metric Last Write"
state.Type = "Watermark"
states = append(states, state)
state = l.metricMembershipIndex.State()
state.Name = "Metric Membership"
state.Type = "Index"
states = append(states, state)
// state = l.metricMembershipIndex.State()
// state.Name = "Metric Membership"
// state.Type = "Index"
// states = append(states, state)
state = l.MetricSamples.State()
state.Name = "Samples"

23
storage/raw/interface.go

@ -19,9 +19,23 @@ import (
"github.com/prometheus/prometheus/storage"
)
type ForEacher interface {
// ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met:
//
// 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached.
// 3.) A FilterResult of STOP is emitted by the Filter.
//
// Decoding errors for an entity cause that entity to be skipped.
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
}
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
ForEacher
// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
@ -34,15 +48,6 @@ type Persistence interface {
Drop(key proto.Message) error
// Put sets the key to a given value.
Put(key, value proto.Message) error
// ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met:
//
// 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached.
// 3.) A FilterResult of STOP is emitted by the Filter.
//
// Decoding errors for an entity cause that entity to be skipped.
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
// Commit applies the Batch operations to the database.
Commit(Batch) error
}

Loading…
Cancel
Save