Exploding the storage infrastructure by contexts.

pull/1/merge
Matt T. Proud 2012-12-09 16:27:12 +01:00
parent 15a6681651
commit 577acf4fe7
10 changed files with 888 additions and 634 deletions

View File

@ -17,12 +17,13 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
data "github.com/matttproud/prometheus/model/generated" "errors"
dto "github.com/matttproud/prometheus/model/generated"
"io" "io"
"sort" "sort"
) )
func SampleToMetricDTO(s *Sample) *data.Metric { func SampleToMetricDTO(s *Sample) *dto.Metric {
labelLength := len(s.Labels) labelLength := len(s.Labels)
labelNames := make([]string, 0, labelLength) labelNames := make([]string, 0, labelLength)
@ -32,11 +33,11 @@ func SampleToMetricDTO(s *Sample) *data.Metric {
sort.Strings(labelNames) sort.Strings(labelNames)
labelSets := make([]*data.LabelPair, 0, labelLength) labelSets := make([]*dto.LabelPair, 0, labelLength)
for _, labelName := range labelNames { for _, labelName := range labelNames {
labelValue := s.Labels[LabelName(labelName)] labelValue := s.Labels[LabelName(labelName)]
labelPair := &data.LabelPair{ labelPair := &dto.LabelPair{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)), Value: proto.String(string(labelValue)),
} }
@ -44,12 +45,12 @@ func SampleToMetricDTO(s *Sample) *data.Metric {
labelSets = append(labelSets, labelPair) labelSets = append(labelSets, labelPair)
} }
return &data.Metric{ return &dto.Metric{
LabelPair: labelSets, LabelPair: labelSets,
} }
} }
func MetricToDTO(m *Metric) *data.Metric { func MetricToDTO(m *Metric) *dto.Metric {
metricLength := len(*m) metricLength := len(*m)
labelNames := make([]string, 0, metricLength) labelNames := make([]string, 0, metricLength)
@ -59,12 +60,12 @@ func MetricToDTO(m *Metric) *data.Metric {
sort.Strings(labelNames) sort.Strings(labelNames)
labelSets := make([]*data.LabelPair, 0, metricLength) labelSets := make([]*dto.LabelPair, 0, metricLength)
for _, labelName := range labelNames { for _, labelName := range labelNames {
l := LabelName(labelName) l := LabelName(labelName)
labelValue := (*m)[l] labelValue := (*m)[l]
labelPair := &data.LabelPair{ labelPair := &dto.LabelPair{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)), Value: proto.String(string(labelValue)),
} }
@ -72,7 +73,7 @@ func MetricToDTO(m *Metric) *data.Metric {
labelSets = append(labelSets, labelPair) labelSets = append(labelSets, labelPair)
} }
return &data.Metric{ return &dto.Metric{
LabelPair: labelSets, LabelPair: labelSets,
} }
} }
@ -89,7 +90,7 @@ func BytesToFingerprint(v []byte) Fingerprint {
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
} }
func LabelSetToDTOs(s *LabelSet) []*data.LabelPair { func LabelSetToDTOs(s *LabelSet) []*dto.LabelPair {
metricLength := len(*s) metricLength := len(*s)
labelNames := make([]string, 0, metricLength) labelNames := make([]string, 0, metricLength)
@ -99,12 +100,12 @@ func LabelSetToDTOs(s *LabelSet) []*data.LabelPair {
sort.Strings(labelNames) sort.Strings(labelNames)
labelSets := make([]*data.LabelPair, 0, metricLength) labelSets := make([]*dto.LabelPair, 0, metricLength)
for _, labelName := range labelNames { for _, labelName := range labelNames {
l := LabelName(labelName) l := LabelName(labelName)
labelValue := (*s)[l] labelValue := (*s)[l]
labelPair := &data.LabelPair{ labelPair := &dto.LabelPair{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)), Value: proto.String(string(labelValue)),
} }
@ -115,14 +116,33 @@ func LabelSetToDTOs(s *LabelSet) []*data.LabelPair {
return labelSets return labelSets
} }
func LabelSetToDTO(s *LabelSet) *data.LabelSet { func LabelSetToDTO(s *LabelSet) *dto.LabelSet {
return &data.LabelSet{ return &dto.LabelSet{
Member: LabelSetToDTOs(s), Member: LabelSetToDTOs(s),
} }
} }
func LabelNameToDTO(l *LabelName) *data.LabelName { func LabelNameToDTO(l *LabelName) *dto.LabelName {
return &data.LabelName{ return &dto.LabelName{
Name: proto.String(string(*l)), Name: proto.String(string(*l)),
} }
} }
func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint {
return &dto.Fingerprint{
Signature: proto.String(string(*f)),
}
}
func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := BytesToFingerprint(messageByteArray)
return &dto.Fingerprint{
Signature: proto.String(string(fingerprint)),
}, nil
} else {
return nil, marshalError
}
return nil, errors.New("Unknown error in generating FingerprintDTO from message.")
}

View File

@ -30,8 +30,13 @@ type MetricPersistence interface {
// Get all of the metric fingerprints that are associated with the provided // Get all of the metric fingerprints that are associated with the provided
// label set. // label set.
GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error)
// Get all of the metric fingerprints that are associated for a given label
// name.
GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error)
GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error)
GetAllLabelNames() ([]string, error) GetAllLabelNames() ([]string, error)
GetAllLabelPairs() ([]model.LabelSet, error) GetAllLabelPairs() ([]model.LabelSet, error)
GetAllMetrics() ([]model.LabelSet, error) GetAllMetrics() ([]model.LabelSet, error)

View File

@ -0,0 +1,117 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/utility"
)
func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) {
if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil {
result := make([]string, 0, len(getAll))
labelNameDTO := &dto.LabelName{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil {
result = append(result, *labelNameDTO.Name)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label names.")
}
func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelSet, 0, len(getAll))
labelPairDTO := &dto.LabelPair{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil {
n := model.LabelName(*labelPairDTO.Name)
v := model.LabelValue(*labelPairDTO.Value)
item := model.LabelSet{n: v}
result = append(result, item)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label pairs.")
}
func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelSet, 0)
fingerprintCollection := &dto.FingerprintCollection{}
fingerprints := make(utility.Set)
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil {
for _, member := range fingerprintCollection.Member {
if !fingerprints.Has(*member.Signature) {
fingerprints.Add(*member.Signature)
fingerprintEncoded := coding.NewProtocolBufferEncoder(member)
if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil {
labelPairCollectionDTO := &dto.LabelSet{}
if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil {
intermediate := make(model.LabelSet, 0)
for _, member := range labelPairCollectionDTO.Member {
n := model.LabelName(*member.Name)
v := model.LabelValue(*member.Value)
intermediate[n] = v
}
result = append(result, intermediate)
} else {
return nil, labelPairCollectionDTOMarshalError
}
} else {
return nil, labelPairCollectionRawError
}
}
}
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying metrics.")
}

View File

@ -1,600 +0,0 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
data "github.com/matttproud/prometheus/model/generated"
index "github.com/matttproud/prometheus/storage/raw/index/leveldb"
storage "github.com/matttproud/prometheus/storage/raw/leveldb"
"github.com/matttproud/prometheus/utility"
"io"
"log"
)
type LevelDBMetricPersistence struct {
fingerprintToMetrics *storage.LevelDBPersistence
metricSamples *storage.LevelDBPersistence
labelNameToFingerprints *storage.LevelDBPersistence
labelSetToFingerprints *storage.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
}
type leveldbOpener func()
func (l *LevelDBMetricPersistence) Close() error {
log.Printf("Closing LevelDBPersistence storage containers...")
var persistences = []struct {
name string
closer io.Closer
}{
{
"Fingerprint to Label Name and Value Pairs",
l.fingerprintToMetrics,
},
{
"Fingerprint Samples",
l.metricSamples,
},
{
"Label Name to Fingerprints",
l.labelNameToFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelSetToFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
}
errorChannel := make(chan error, len(persistences))
for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer
go func(name string, closer io.Closer) {
if closer != nil {
log.Printf("Closing LevelDBPersistence storage container: %s\n", name)
closingError := closer.Close()
if closingError != nil {
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError)
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
}(name, closer)
}
for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
if closingError != nil {
return closingError
}
}
log.Printf("Successfully closed all LevelDBPersistence storage containers.")
return nil
}
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
log.Printf("Opening LevelDBPersistence storage containers...")
errorChannel := make(chan error, 5)
emission := &LevelDBMetricPersistence{}
var subsystemOpeners = []struct {
name string
opener leveldbOpener
}{
{
"Label Names and Value Pairs by Fingerprint",
func() {
var err error
emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{
"Samples by Fingerprint",
func() {
var err error
emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{
"Fingerprints by Label Name",
func() {
var err error
emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
errorChannel <- err
},
},
{
"Fingerprints by Label Name and Value Pair",
func() {
var err error
emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
errorChannel <- err
},
},
{
"Metric Membership Index",
func() {
var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
errorChannel <- err
},
},
}
for _, subsystem := range subsystemOpeners {
name := subsystem.name
opener := subsystem.opener
log.Printf("Opening LevelDBPersistence storage container: %s\n", name)
go opener()
}
for i := 0; i < cap(errorChannel); i++ {
openingError := <-errorChannel
if openingError != nil {
log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError)
return nil, openingError
}
}
log.Printf("Successfully opened all LevelDBPersistence storage containers.\n")
return emission, nil
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *data.Metric) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) indexMetric(dto *data.Metric) error {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Put(dtoKey)
}
// TODO(mtp): Candidate for refactoring.
func fingerprintDTOForMessage(message proto.Message) (*data.Fingerprint, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := model.BytesToFingerprint(messageByteArray)
return &data.Fingerprint{
Signature: proto.String(string(fingerprint)),
}, nil
} else {
return nil, marshalError
}
return nil, errors.New("Unknown error in generating FingerprintDTO from message.")
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *data.LabelPair) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelSetToFingerprints.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *data.LabelName) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelNameToFingerprints.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(dto *data.LabelPair) (*data.FingerprintCollection, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil {
value := &data.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
panic("unreachable")
}
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(dto *data.LabelName) (*data.FingerprintCollection, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil {
value := &data.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
return nil, errors.New("Unknown error while getting label name fingerprints.")
}
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPair, fingerprints *data.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelName, fingerprints *data.FingerprintCollection) error {
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error {
if has, hasError := l.HasLabelPair(labelPair); hasError == nil {
var fingerprints *data.FingerprintCollection
if has {
if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &data.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPair, fingerprint *data.Fingerprint) error {
labelName := &data.LabelName{
Name: labelPair.Name,
}
if has, hasError := l.HasLabelName(labelName); hasError == nil {
var fingerprints *data.FingerprintCollection
if has {
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &data.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevelDBMetricPersistence) appendFingerprints(dto *data.Metric) error {
if fingerprintDTO, fingerprintDTOError := fingerprintDTOForMessage(dto); fingerprintDTOError == nil {
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(dto)
if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil {
labelCount := len(dto.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range dto.LabelPair {
go func(labelPair *data.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
go func(labelPair *data.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
appendError := <-labelPairErrors
if appendError != nil {
return appendError
}
}
for i := 0; i < cap(labelNameErrors); i++ {
appendError := <-labelNameErrors
if appendError != nil {
return appendError
}
}
return nil
} else {
return putError
}
} else {
return fingerprintDTOError
}
return errors.New("Unknown error in appending label pairs to fingerprint.")
}
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error {
metricDTO := model.SampleToMetricDTO(sample)
if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil {
if !indexHas {
if indexPutError := l.indexMetric(metricDTO); indexPutError == nil {
if appendError := l.appendFingerprints(metricDTO); appendError != nil {
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
return appendError
}
} else {
log.Printf("Could not add metric to membership index: %q\n", indexPutError)
return indexPutError
}
}
} else {
log.Printf("Could not query membership index for metric: %q\n", indexHasError)
return indexHasError
}
if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil {
sampleKeyDTO := &data.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &data.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
log.Printf("Could not append metric sample: %q\n", putError)
return putError
}
} else {
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr)
return fingerprintDTOErr
}
return nil
}
func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) {
if getAll, getAllError := l.labelNameToFingerprints.GetAll(); getAllError == nil {
result := make([]string, 0, len(getAll))
labelNameDTO := &data.LabelName{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelNameDTO); unmarshalError == nil {
result = append(result, *labelNameDTO.Name)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label names.")
}
func (l *LevelDBMetricPersistence) GetAllLabelPairs() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelSet, 0, len(getAll))
labelPairDTO := &data.LabelPair{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDTO); unmarshalError == nil {
n := model.LabelName(*labelPairDTO.Name)
v := model.LabelValue(*labelPairDTO.Value)
item := model.LabelSet{n: v}
result = append(result, item)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label pairs.")
}
func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
if getAll, getAllError := l.labelSetToFingerprints.GetAll(); getAllError == nil {
result := make([]model.LabelSet, 0)
fingerprintCollection := &data.FingerprintCollection{}
fingerprints := make(utility.Set)
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil {
for _, member := range fingerprintCollection.Member {
if !fingerprints.Has(*member.Signature) {
fingerprints.Add(*member.Signature)
fingerprintEncoded := coding.NewProtocolBufferEncoder(member)
if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintToMetrics.Get(fingerprintEncoded); labelPairCollectionRawError == nil {
labelPairCollectionDTO := &data.LabelSet{}
if labelPairCollectionDTOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDTO); labelPairCollectionDTOMarshalError == nil {
intermediate := make(model.LabelSet, 0)
for _, member := range labelPairCollectionDTO.Member {
n := model.LabelName(*member.Name)
v := model.LabelValue(*member.Value)
intermediate[n] = v
}
result = append(result, intermediate)
} else {
return nil, labelPairCollectionDTOMarshalError
}
} else {
return nil, labelPairCollectionRawError
}
}
}
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying metrics.")
}
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDTO := model.MetricToDTO(&metric)
if fingerprintDTO, fingerprintDTOErr := fingerprintDTOForMessage(metricDTO); fingerprintDTOErr == nil {
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &data.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &data.SampleKey{}
value := &data.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if *fingerprintDTO.Signature == *key.Fingerprint.Signature {
// Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
break
}
} else {
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
}
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDTOErr
}
panic("unreachable")
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) {
emission := make([]*model.Fingerprint, 0, 0)
for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) {
if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil {
unmarshaled := &data.FingerprintCollection{}
if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
}
return emission, nil
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) {
emission := make([]*model.Fingerprint, 0, 0)
if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil {
unmarshaled := &data.FingerprintCollection{}
if err = proto.Unmarshal(raw, unmarshaled); err == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
return emission, nil
}

View File

@ -17,7 +17,7 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
"github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/model"
data "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"io/ioutil" "io/ioutil"
"math" "math"
"math/rand" "math/rand"
@ -31,7 +31,12 @@ const (
stochasticMaximumVariance = 64 stochasticMaximumVariance = 64
) )
func TestBasicLifecycle(t *testing.T) { type tester interface {
Errorf(format string, args ...interface{})
Error(args ...interface{})
}
var testBasicLifecycle func(t tester) = func(t tester) {
temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test")
if temporaryDirectoryErr != nil { if temporaryDirectoryErr != nil {
@ -63,7 +68,17 @@ func TestBasicLifecycle(t *testing.T) {
} }
} }
func TestReadEmpty(t *testing.T) { func TestBasicLifecycle(t *testing.T) {
testBasicLifecycle(t)
}
func BenchmarkBasicLifecycle(b *testing.B) {
for i := 0; i < b.N; i++ {
testBasicLifecycle(b)
}
}
var testReadEmpty func(t tester) = func(t tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {
@ -82,7 +97,7 @@ func TestReadEmpty(t *testing.T) {
name := string(x) name := string(x)
value := string(x) value := string(x)
dto := &data.LabelPair{ dto := &dto.LabelPair{
Name: proto.String(name), Name: proto.String(name),
Value: proto.String(value), Value: proto.String(value),
} }
@ -102,7 +117,7 @@ func TestReadEmpty(t *testing.T) {
hasLabelName := func(x int) bool { hasLabelName := func(x int) bool {
name := string(x) name := string(x)
dto := &data.LabelName{ dto := &dto.LabelName{
Name: proto.String(name), Name: proto.String(name),
} }
@ -123,7 +138,7 @@ func TestReadEmpty(t *testing.T) {
name := string(x) name := string(x)
value := string(x) value := string(x)
dto := &data.LabelPair{ dto := &dto.LabelPair{
Name: proto.String(name), Name: proto.String(name),
Value: proto.String(value), Value: proto.String(value),
} }
@ -148,7 +163,7 @@ func TestReadEmpty(t *testing.T) {
getLabelNameFingerprints := func(x int) bool { getLabelNameFingerprints := func(x int) bool {
name := string(x) name := string(x)
dto := &data.LabelName{ dto := &dto.LabelName{
Name: proto.String(name), Name: proto.String(name),
} }
@ -170,7 +185,17 @@ func TestReadEmpty(t *testing.T) {
} }
} }
func TestAppendSampleAsPureSparseAppend(t *testing.T) { func TestReadEmpty(t *testing.T) {
testReadEmpty(t)
}
func BenchmarkReadEmpty(b *testing.B) {
for i := 0; i < b.N; i++ {
testReadEmpty(b)
}
}
var testAppendSampleAsPureSparseAppend = func(t tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {
@ -206,7 +231,17 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) {
} }
} }
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { func TestAppendSampleAsPureSparseAppend(t *testing.T) {
testAppendSampleAsPureSparseAppend(t)
}
func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSparseAppend(b)
}
}
var testAppendSampleAsSparseAppendWithReads func(t tester) = func(t tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() { defer func() {
@ -238,7 +273,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelNameDTO := &data.LabelName{ labelNameDTO := &dto.LabelName{
Name: proto.String(string(x)), Name: proto.String(string(x)),
} }
@ -252,7 +287,7 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
return false return false
} }
labelPairDTO := &data.LabelPair{ labelPairDTO := &dto.LabelPair{
Name: proto.String(string(x)), Name: proto.String(string(x)),
Value: proto.String(string(x)), Value: proto.String(string(x)),
} }
@ -303,6 +338,16 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
} }
} }
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
testAppendSampleAsSparseAppendWithReads(t)
}
func BenchmarkAppendSampleAsSparseAppendWithReads(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsSparseAppendWithReads(b)
}
}
func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) { func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
@ -429,7 +474,7 @@ func TestStochastic(t *testing.T) {
metricNewestSample[metricIndex] = newestSample metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
labelPair := &data.LabelPair{ labelPair := &dto.LabelPair{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)), Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)),
} }
@ -444,7 +489,7 @@ func TestStochastic(t *testing.T) {
return false return false
} }
labelName := &data.LabelName{ labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
} }
@ -461,7 +506,7 @@ func TestStochastic(t *testing.T) {
} }
for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ { for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ {
labelName := &data.LabelName{ labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)), Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)),
} }
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName) fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
@ -481,7 +526,7 @@ func TestStochastic(t *testing.T) {
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelPair := &data.LabelPair{ labelPair := &dto.LabelPair{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)), Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)),
} }
@ -510,7 +555,7 @@ func TestStochastic(t *testing.T) {
return false return false
} }
labelName := &data.LabelName{ labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
} }
@ -793,3 +838,99 @@ func TestGetFingerprintsForLabelName(t *testing.T) {
t.Errorf("Expected one element.") t.Errorf("Expected one element.")
} }
} }
func TestGetMetricForFingerprint(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Labels: model.LabelSet{
"request_type": "your_mom",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(&model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Labels: model.LabelSet{
"request_type": "your_dad",
"one-off": "value",
},
})
if appendErr != nil {
t.Error(appendErr)
}
result, getErr := persistence.GetFingerprintsForLabelSet(&(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
}))
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e := persistence.GetMetricForFingerprint(result[0])
if e != nil {
t.Error(e)
}
if len(*v) != 1 {
t.Errorf("Expected one-dimensional metric.")
}
if (*v)["request_type"] != "your_mom" {
t.Errorf("Expected metric to match.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(&(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
}))
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e = persistence.GetMetricForFingerprint(result[0])
if e != nil {
t.Error(e)
}
if len(*v) != 2 {
t.Errorf("Expected one-dimensional metric.")
}
if (*v)["request_type"] != "your_dad" {
t.Errorf("Expected metric to match.")
}
if (*v)["one-off"] != "value" {
t.Errorf("Expected metric to match.")
}
}

View File

@ -0,0 +1,164 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
index "github.com/matttproud/prometheus/storage/raw/index/leveldb"
storage "github.com/matttproud/prometheus/storage/raw/leveldb"
"io"
"log"
)
type leveldbOpener func()
func (l *LevelDBMetricPersistence) Close() error {
log.Printf("Closing LevelDBPersistence storage containers...")
var persistences = []struct {
name string
closer io.Closer
}{
{
"Fingerprint to Label Name and Value Pairs",
l.fingerprintToMetrics,
},
{
"Fingerprint Samples",
l.metricSamples,
},
{
"Label Name to Fingerprints",
l.labelNameToFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelSetToFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
}
errorChannel := make(chan error, len(persistences))
for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer
go func(name string, closer io.Closer) {
if closer != nil {
log.Printf("Closing LevelDBPersistence storage container: %s\n", name)
closingError := closer.Close()
if closingError != nil {
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError)
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
}(name, closer)
}
for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
if closingError != nil {
return closingError
}
}
log.Printf("Successfully closed all LevelDBPersistence storage containers.")
return nil
}
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
log.Printf("Opening LevelDBPersistence storage containers...")
errorChannel := make(chan error, 5)
emission := &LevelDBMetricPersistence{}
var subsystemOpeners = []struct {
name string
opener leveldbOpener
}{
{
"Label Names and Value Pairs by Fingerprint",
func() {
var err error
emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{
"Samples by Fingerprint",
func() {
var err error
emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
errorChannel <- err
},
},
{
"Fingerprints by Label Name",
func() {
var err error
emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
errorChannel <- err
},
},
{
"Fingerprints by Label Name and Value Pair",
func() {
var err error
emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
errorChannel <- err
},
},
{
"Metric Membership Index",
func() {
var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
errorChannel <- err
},
},
}
for _, subsystem := range subsystemOpeners {
name := subsystem.name
opener := subsystem.opener
log.Printf("Opening LevelDBPersistence storage container: %s\n", name)
go opener()
}
for i := 0; i < cap(errorChannel); i++ {
openingError := <-errorChannel
if openingError != nil {
log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError)
return nil, openingError
}
}
log.Printf("Successfully opened all LevelDBPersistence storage containers.\n")
return emission, nil
}

View File

@ -0,0 +1,180 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated"
"log"
)
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.LabelName, fingerprints *dto.FingerprintCollection) error {
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error {
if has, hasError := l.HasLabelPair(labelPair); hasError == nil {
var fingerprints *dto.FingerprintCollection
if has {
if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error {
labelName := &dto.LabelName{
Name: labelPair.Name,
}
if has, hasError := l.HasLabelName(labelName); hasError == nil {
var fingerprints *dto.FingerprintCollection
if has {
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) error {
if fingerprintDTO, fingerprintDTOError := model.MessageToFingerprintDTO(m); fingerprintDTOError == nil {
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m)
if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil {
labelCount := len(m.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair {
go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
go func(labelPair *dto.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
appendError := <-labelPairErrors
if appendError != nil {
return appendError
}
}
for i := 0; i < cap(labelNameErrors); i++ {
appendError := <-labelNameErrors
if appendError != nil {
return appendError
}
}
return nil
} else {
return putError
}
} else {
return fingerprintDTOError
}
return errors.New("Unknown error in appending label pairs to fingerprint.")
}
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error {
metricDTO := model.SampleToMetricDTO(sample)
if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil {
if !indexHas {
if indexPutError := l.indexMetric(metricDTO); indexPutError == nil {
if appendError := l.appendFingerprints(metricDTO); appendError != nil {
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
return appendError
}
} else {
log.Printf("Could not add metric to membership index: %q\n", indexPutError)
return indexPutError
}
}
} else {
log.Printf("Could not query membership index for metric: %q\n", indexHasError)
return indexHasError
}
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil {
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &dto.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
log.Printf("Could not append metric sample: %q\n", putError)
return putError
}
} else {
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr)
return fingerprintDTOErr
}
return nil
}

View File

@ -0,0 +1,200 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated"
"log"
)
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) indexMetric(dto *dto.Metric) error {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Put(dtoKey)
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelSetToFingerprints.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (bool, error) {
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelNameToFingerprints.Has(dtoKey)
}
func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (*dto.FingerprintCollection, error) {
dtoKey := coding.NewProtocolBufferEncoder(p)
if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil {
value := &dto.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
panic("unreachable")
}
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (*dto.FingerprintCollection, error) {
dtoKey := coding.NewProtocolBufferEncoder(n)
if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil {
value := &dto.FingerprintCollection{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
return nil, errors.New("Unknown error while getting label name fingerprints.")
}
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDTO := model.MetricToDTO(&metric)
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil {
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
value := &dto.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if *fingerprintDTO.Signature == *key.Fingerprint.Signature {
// Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
break
}
} else {
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
}
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDTOErr
}
panic("unreachable")
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) {
emission := make([]*model.Fingerprint, 0, 0)
for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) {
if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil {
unmarshaled := &dto.FingerprintCollection{}
if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
}
return emission, nil
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) {
emission := make([]*model.Fingerprint, 0, 0)
if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil {
unmarshaled := &dto.FingerprintCollection{}
if err = proto.Unmarshal(raw, unmarshaled); err == nil {
for _, m := range unmarshaled.Member {
fp := model.Fingerprint(*m.Signature)
emission = append(emission, &fp)
}
} else {
return nil, err
}
} else {
return nil, err
}
return emission, nil
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) {
if raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))); err == nil {
unmarshaled := &dto.Metric{}
if unmarshalErr := proto.Unmarshal(raw, unmarshaled); unmarshalErr == nil {
m := model.Metric{}
for _, v := range unmarshaled.LabelPair {
m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
}
return &m, nil
} else {
return nil, unmarshalErr
}
} else {
return nil, err
}
panic("unreachable")
}

View File

@ -0,0 +1,27 @@
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
index "github.com/matttproud/prometheus/storage/raw/index/leveldb"
storage "github.com/matttproud/prometheus/storage/raw/leveldb"
)
type LevelDBMetricPersistence struct {
fingerprintToMetrics *storage.LevelDBPersistence
metricSamples *storage.LevelDBPersistence
labelNameToFingerprints *storage.LevelDBPersistence
labelSetToFingerprints *storage.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
}

View File

@ -15,12 +15,12 @@ package leveldb
import ( import (
"github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding"
data "github.com/matttproud/prometheus/model/generated" dto "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/storage/raw/leveldb" "github.com/matttproud/prometheus/storage/raw/leveldb"
) )
var ( var (
existenceValue = coding.NewProtocolBufferEncoder(&data.MembershipIndexValue{}) existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{})
) )
type LevelDBMembershipIndex struct { type LevelDBMembershipIndex struct {