Start writing high watermarks.

pull/84/head
Matt T. Proud 2013-03-14 19:24:28 -07:00
parent a224dda9f0
commit 1f7ed52b46
3 changed files with 85 additions and 3 deletions

View File

@ -55,3 +55,7 @@ message SampleValueSeries {
message MembershipIndexValue {
}
message MetricHighWatermark {
optional int64 timestamp = 1;
}

View File

@ -49,6 +49,7 @@ const (
indexMetric = "index_metric"
indexMetrics = "index_metrics"
rebuildDiskFrontier = "rebuild_disk_frontier"
refreshHighWatermarks = "refresh_high_watermarks"
renderView = "render_view"
setLabelNameFingerprints = "set_label_name_fingerprints"
setLabelPairFingerprints = "set_label_pair_fingerprints"

View File

@ -39,16 +39,18 @@ var (
type LevelDBMetricPersistence struct {
fingerprintToMetrics *leveldb.LevelDBPersistence
metricSamples *leveldb.LevelDBPersistence
labelNameToFingerprints *leveldb.LevelDBPersistence
labelSetToFingerprints *leveldb.LevelDBPersistence
metricHighWatermarks *leveldb.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
metricSamples *leveldb.LevelDBPersistence
}
var (
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).")
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 50*1024*1024, "The size for the metric high watermarks (bytes).")
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).")
labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).")
labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).")
@ -66,6 +68,10 @@ func (l *LevelDBMetricPersistence) Close() error {
"Fingerprint to Label Name and Value Pairs",
l.fingerprintToMetrics,
},
{
"Fingerprint High Watermarks",
l.metricHighWatermarks,
},
{
"Fingerprint Samples",
l.metricSamples,
@ -117,7 +123,7 @@ func (l *LevelDBMetricPersistence) Close() error {
}
func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {
errorChannel := make(chan error, 5)
errorChannel := make(chan error, 6)
emission := &LevelDBMetricPersistence{}
@ -141,6 +147,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
errorChannel <- err
},
},
{
"High Watermarks by Fingerprint",
func() {
var err error
emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
errorChannel <- err
},
},
{
"Fingerprints by Label Name",
func() {
@ -529,6 +543,60 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
return
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Fingerprint]model.Samples) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
}()
batch := leveldb.NewBatch()
defer batch.Close()
var (
mutationCount = 0
)
for fingerprint, samples := range groups {
var (
key = &dto.Fingerprint{}
value = &dto.MetricHighWatermark{}
raw []byte
oldestSampleTimestamp = samples[len(samples)-1].Timestamp
keyEncoded = coding.NewProtocolBufferEncoder(key)
)
key.Signature = proto.String(fingerprint.ToRowKey())
raw, err = l.metricHighWatermarks.Get(keyEncoded)
if err != nil {
panic(err)
return
}
if raw != nil {
err = proto.Unmarshal(raw, value)
if err != nil {
panic(err)
continue
}
if oldestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
continue
}
}
value.Timestamp = proto.Int64(oldestSampleTimestamp.Unix())
batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value))
mutationCount++
}
err = l.metricHighWatermarks.Commit(batch)
if err != nil {
panic(err)
}
return
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
begin := time.Now()
defer func() {
@ -540,6 +608,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
var (
fingerprintToSamples = groupByFingerprint(samples)
indexErrChan = make(chan error)
watermarkErrChan = make(chan error)
)
go func(groups map[model.Fingerprint]model.Samples) {
@ -554,6 +623,10 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
indexErrChan <- l.indexMetrics(metrics)
}(fingerprintToSamples)
go func(groups map[model.Fingerprint]model.Samples) {
watermarkErrChan <- l.refreshHighWatermarks(groups)
}(fingerprintToSamples)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
@ -593,7 +666,6 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
@ -603,6 +675,11 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
panic(err)
}
err = <-watermarkErrChan
if err != nil {
panic(err)
}
return
}